mirror of
https://github.com/zoriya/cish.git
synced 2025-12-05 23:06:18 +00:00
78 lines
1.2 KiB
Go
78 lines
1.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
func Listener[T any](
|
|
ctx context.Context,
|
|
db *pgxpool.Pool,
|
|
channel string,
|
|
handler func(T),
|
|
) {
|
|
for {
|
|
processAllEvents(ctx, db, channel, handler)
|
|
listenNewEvents(ctx, db, channel, handler)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// retry in case of connection failure
|
|
time.Sleep(30 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
func processAllEvents[T any](
|
|
ctx context.Context,
|
|
db *pgxpool.Pool,
|
|
channel string,
|
|
handler func(T),
|
|
) error {
|
|
rows, err := db.Query(ctx, fmt.Sprintf("select * from %s", channel))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
events, err := pgx.CollectRows(rows, pgx.RowToStructByName[T])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, evt := range events {
|
|
handler(evt)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func listenNewEvents[T any](
|
|
ctx context.Context,
|
|
db *pgxpool.Pool,
|
|
channel string,
|
|
handler func(T),
|
|
) error {
|
|
conn, err := db.Acquire(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Release()
|
|
|
|
_, err = conn.Exec(ctx, fmt.Sprintf("listen %s", channel))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
_, err := conn.Conn().WaitForNotification(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
processAllEvents(ctx, db, channel, handler)
|
|
}
|
|
}
|
|
|