Files
cish/controller/listener.go
2025-08-23 13:45:10 +02:00

78 lines
1.2 KiB
Go

package main
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func Listener[T any](
ctx context.Context,
db *pgxpool.Pool,
channel string,
handler func(T),
) {
for {
processAllEvents(ctx, db, channel, handler)
listenNewEvents(ctx, db, channel, handler)
select {
case <-ctx.Done():
return
default:
// retry in case of connection failure
time.Sleep(30 * time.Second)
}
}
}
func processAllEvents[T any](
ctx context.Context,
db *pgxpool.Pool,
channel string,
handler func(T),
) error {
rows, err := db.Query(ctx, fmt.Sprintf("select * from %s", channel))
if err != nil {
return err
}
events, err := pgx.CollectRows(rows, pgx.RowToStructByName[T])
if err != nil {
return err
}
for _, evt := range events {
handler(evt)
}
return nil
}
func listenNewEvents[T any](
ctx context.Context,
db *pgxpool.Pool,
channel string,
handler func(T),
) error {
conn, err := db.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
_, err = conn.Exec(ctx, fmt.Sprintf("listen %s", channel))
if err != nil {
return err
}
for {
_, err := conn.Conn().WaitForNotification(ctx)
if err != nil {
return err
}
processAllEvents(ctx, db, channel, handler)
}
}