1package natsio
2
3import (
4 "encoding/json"
5 "github.com/nats-io/nats.go"
6 "github.com/openrecce/pusher/internal/config"
7 "github.com/rs/zerolog/log"
8 "io"
9 "os"
10 "sync"
11 "time"
12)
13
14type Nats struct {
15 Conn *nats.Conn
16 EncConn *nats.EncodedConn
17 once sync.Once
18}
19
20func Encoder(v io.Reader, ptr interface{}) ([]byte, error) {
21 d := json.NewDecoder(v)
22 err := d.Decode(&ptr)
23 if err != nil {
24 return nil, err
25 }
26
27 out, err := json.Marshal(ptr)
28 if err != nil {
29 return nil, err
30 }
31 return out, nil
32}
33
34func Connect(name string) (*nats.Conn, error) {
35 uri := config.AppConfig().Nats.URI
36 opts := []nats.Option{nats.Name(name)}
37 opts = setupConnOptions(opts)
38 nc, err := nats.Connect(
39 uri,
40 opts...,
41 )
42 if err != nil {
43 return nil, err
44 }
45
46 return nc, nil
47}
48func EConnect(nc *nats.Conn) (*nats.EncodedConn, error) {
49 ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
50 if err != nil {
51 return nil, err
52 }
53 return ec, nil
54}
55
56func setupConnOptions(opts []nats.Option) []nats.Option {
57 totalWait := 10 * time.Minute
58 reconnectDelay := time.Second
59 pingInterval := 20 * time.Second
60 maxPingOutstanding := 5
61 timeout := 30 * time.Second
62
63 opts = append(opts, nats.Timeout(timeout))
64 opts = append(opts, nats.PingInterval(pingInterval))
65 opts = append(opts, nats.MaxPingsOutstanding(maxPingOutstanding))
66 opts = append(opts, nats.ReconnectWait(reconnectDelay))
67 opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
68 opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
69 log.Printf("Disconnected due to: %s, will attempt reconnects for %.0fm", err, totalWait.Minutes())
70 }))
71 opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
72 log.Printf("Reconnected [%s]", nc.ConnectedUrl())
73 }))
74 opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
75 log.Error().Msgf("Exiting: %v", nc.LastError())
76 os.Exit(1)
77 }))
78 opts = append(opts, nats.DrainTimeout(10*time.Second))
79 return opts
80}