Skip to content

Commit

Permalink
Update nsq
Browse files Browse the repository at this point in the history
  • Loading branch information
shenghui0779 committed Dec 4, 2023
1 parent d93333a commit 934657f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 34 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/redis/go-redis/v9 v9.3.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.15.0
golang.org/x/crypto v0.16.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

Expand All @@ -29,15 +29,15 @@ require (
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
Expand Down Expand Up @@ -72,14 +72,14 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
23 changes: 6 additions & 17 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,32 +91,21 @@ func WithLogger(name string, cfg *LoggerConfig) InitOption {
}
}

// WithNSQProducer 设置nsq生产者
func WithNSQProducer(nsqd string, cfg *nsq.Config) InitOption {
// WithNSQ 设置NSQ
func WithNSQ(nsqd string, lookupd []string, cfg *nsq.Config, consumers ...NSQConsumer) InitOption {
return func() {
if err := initNSQProducer(nsqd, cfg); err != nil {
logger.Panic("err nsq producer init", zap.String("nsqd", nsqd), zap.Error(err))
if err := initNSQ(nsqd, lookupd, cfg, consumers...); err != nil {
logger.Panic("err nsq init", zap.String("nsqd", nsqd), zap.Strings("lookupd", lookupd), zap.Error(err))
}

logger.Info("nsq producer is OK")
}
}

// WithNSQConsumers 设置nsq消费者
func WithNSQConsumers(lookupd []string, consumers ...NSQConsumer) InitOption {
return func() {
if err := setNSQConsumers(lookupd, consumers...); err != nil {
logger.Panic("err set nsq consumers", zap.Strings("lookupd", lookupd), zap.Error(err))
}

logger.Info("nsq consumers set OK")
}
}

// WithWebsocket 设置websocket
func WithWebsocket(upgrader *websocket.Upgrader) InitOption {
func WithWebsocket(up *websocket.Upgrader) InitOption {
return func() {
wsupgrader = upgrader
upgrader = up
}
}

Expand Down
9 changes: 7 additions & 2 deletions nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (l *NSQLogger) Output(calldepth int, s string) error {
return nil
}

func initNSQProducer(nsqd string, cfg *nsq.Config) error {
func initNSQ(nsqd string, lookupd []string, cfg *nsq.Config, consumers ...NSQConsumer) error {
if cfg == nil {
cfg = nsq.NewConfig()
}
Expand All @@ -39,6 +39,11 @@ func initNSQProducer(nsqd string, cfg *nsq.Config) error {

producer.SetLogger(&NSQLogger{}, nsq.LogLevelError)

// set consumers
if err = consumerSet(lookupd, consumers...); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -77,7 +82,7 @@ type NSQConsumer interface {
Config() *nsq.Config
}

func setNSQConsumers(lookupd []string, consumers ...NSQConsumer) error {
func consumerSet(lookupd []string, consumers ...NSQConsumer) error {
for _, c := range consumers {
cfg := c.Config()

Expand Down
6 changes: 3 additions & 3 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/gorilla/websocket"
)

var wsupgrader *websocket.Upgrader
var upgrader *websocket.Upgrader

// WSMessage websocket消息
type WSMessage struct {
Expand Down Expand Up @@ -126,11 +126,11 @@ func (c *wsconn) Close(ctx context.Context) error {

// NewWSConn 生成一个websocket连接
func NewWSConn(w http.ResponseWriter, r *http.Request, authFn func(ctx context.Context, msg *WSMessage) (*WSMessage, error)) (WSConn, error) {
if wsupgrader == nil {
if upgrader == nil {
return nil, errors.New("upgrader is nil (forgotten configure?)")
}

c, err := wsupgrader.Upgrade(w, r, nil)
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 934657f

Please sign in to comment.