snips.sh

 1package natsio
 2
 3import (
 4	"encoding/json"
 5	"github.com/nats-io/nats.go"
 6	"github.com/openrecce/pusher/internal/config"
 7	"github.com/rs/zerolog/log"
 8	"io"
 9	"os"
10	"sync"
11	"time"
12)
13
14type Nats struct {
15	Conn    *nats.Conn
16	EncConn *nats.EncodedConn
17	once    sync.Once
18}
19
20func Encoder(v io.Reader, ptr interface{}) ([]byte, error) {
21	d := json.NewDecoder(v)
22	err := d.Decode(&ptr)
23	if err != nil {
24		return nil, err
25	}
26
27	out, err := json.Marshal(ptr)
28	if err != nil {
29		return nil, err
30	}
31	return out, nil
32}
33
34func Connect(name string) (*nats.Conn, error) {
35	uri := config.AppConfig().Nats.URI
36	opts := []nats.Option{nats.Name(name)}
37	opts = setupConnOptions(opts)
38	nc, err := nats.Connect(
39		uri,
40		opts...,
41	)
42	if err != nil {
43		return nil, err
44	}
45
46	return nc, nil
47}
48func EConnect(nc *nats.Conn) (*nats.EncodedConn, error) {
49	ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
50	if err != nil {
51		return nil, err
52	}
53	return ec, nil
54}
55
56func setupConnOptions(opts []nats.Option) []nats.Option {
57	totalWait := 10 * time.Minute
58	reconnectDelay := time.Second
59	pingInterval := 20 * time.Second
60	maxPingOutstanding := 5
61	timeout := 30 * time.Second
62
63	opts = append(opts, nats.Timeout(timeout))
64	opts = append(opts, nats.PingInterval(pingInterval))
65	opts = append(opts, nats.MaxPingsOutstanding(maxPingOutstanding))
66	opts = append(opts, nats.ReconnectWait(reconnectDelay))
67	opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
68	opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
69		log.Printf("Disconnected due to: %s, will attempt reconnects for %.0fm", err, totalWait.Minutes())
70	}))
71	opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
72		log.Printf("Reconnected [%s]", nc.ConnectedUrl())
73	}))
74	opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
75		log.Error().Msgf("Exiting: %v", nc.LastError())
76		os.Exit(1)
77	}))
78	opts = append(opts, nats.DrainTimeout(10*time.Second))
79	return opts
80}