Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple MLS messages from messagev1 #333

Merged
merged 13 commits into from
Jan 19, 2024
Merged
2 changes: 2 additions & 0 deletions dev/generate
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
set -e

go generate ./...

mockgen -package api github.com/xmtp/proto/v3/go/mls/api/v1 MlsApi_SubscribeGroupMessagesServer,MlsApi_SubscribeWelcomeMessagesServer > pkg/mls/api/v1/mock.gen.go
4 changes: 4 additions & 0 deletions dev/migrate-mls
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

dev/run --create-mls-migration "$@"
snormore marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions dev/run
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set -e

MESSAGE_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
MLS_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
AUTHZ_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
NODE_KEY="8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67"

Expand All @@ -13,6 +14,7 @@ go run cmd/xmtpd/main.go \
--store.db-connection-string "${MESSAGE_DB_DSN}" \
--store.reader-db-connection-string "${MESSAGE_DB_DSN}" \
--store.metrics-period 5s \
--mls-store.db-connection-string "${MESSAGE_DB_DSN}" \
--authz-db-connection-string "${AUTHZ_DB_DSN}" \
--go-profiling \
"$@"
2 changes: 1 addition & 1 deletion dev/up
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if ! which golangci-lint &>/dev/null; then brew install golangci-lint; fi
if ! which shellcheck &>/dev/null; then brew install shellcheck; fi
if ! which protoc &>/dev/null; then brew install protobuf; fi
if ! which protoc-gen-go &>/dev/null; then go install google.golang.org/protobuf/cmd/protoc-gen-go@latest; fi
if ! which mockgen &>/dev/null; then go install github.com/golang/mock/mockgen@latest; fi
if ! which mockgen &>/dev/null; then go install go.uber.org/mock/mockgen@latest; fi
if ! which protolint &>/dev/null; then go install github.com/yoheimuta/protolint/cmd/protolint@latest; fi

dev/generate
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/ethereum/go-ethereum v1.10.26
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand All @@ -30,8 +29,9 @@ require (
github.com/uptrace/bun/driver/pgdriver v1.1.16
github.com/waku-org/go-waku v0.8.0
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3
github.com/xmtp/proto/v3 v3.36.3-0.20240111132545-4e1d2b1b2399
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0
github.com/yoheimuta/protolint v0.39.0
go.uber.org/mock v0.4.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.53.0
Expand Down Expand Up @@ -75,6 +75,7 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1146,8 +1146,8 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDyNU+1UBu1fn2Nzo+OQzM1pLrheh58=
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4=
github.com/xmtp/proto/v3 v3.36.3-0.20240111132545-4e1d2b1b2399 h1:i5qynxHZRn7mIXQPt8M7c6ac0NBb+MEn2g2qKzvRTyM=
github.com/xmtp/proto/v3 v3.36.3-0.20240111132545-4e1d2b1b2399/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0 h1:eGNiXDTiXcXTf5ne4HACbqbHaQrVlRz2hwcn05E7v8U=
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw=
github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8=
Expand Down Expand Up @@ -1182,6 +1182,8 @@ go.uber.org/fx v1.20.0 h1:ZMC/pnRvhsthOZh9MZjMq5U8Or3mA9zBSPaLnzs3ihQ=
go.uber.org/fx v1.20.0/go.mod h1:qCUj0btiR3/JnanEr1TYEePfSw6o/4qYJscgvzQ5Ub0=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down
18 changes: 1 addition & 17 deletions pkg/api/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Test_AuthnNoToken(t *testing.T) {
})
}

func Test_AuthnNoTokenNonMLS(t *testing.T) {
func Test_AuthnNoTokenNonV0(t *testing.T) {
ctx := context.Background()
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, server *Server) {
_, err := client.Publish(ctx, &messageV1.PublishRequest{
Expand All @@ -36,22 +36,6 @@ func Test_AuthnNoTokenNonMLS(t *testing.T) {
})
}

func Test_AuthnNoTokenMLS(t *testing.T) {
ctx := context.Background()
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, server *Server) {
_, err := client.Publish(ctx, &messageV1.PublishRequest{
Envelopes: []*messageV1.Envelope{
{
ContentTopic: "/xmtp/mls/1/m-0x1234/proto",
TimestampNs: 0,
Message: []byte{},
},
},
})
require.NoError(t, err)
})
}

func Test_AuthnNoTokenMixedV0MLS(t *testing.T) {
ctx := context.Background()
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, server *Server) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ type AuthnOptions struct {
Authenticated requests will be permitted according to the rules of the request type,
(i.e. you can't publish into other wallets' contact and private topics).
*/
Enable bool `long:"enable" description:"require client authentication via wallet tokens"`
EnableMLS bool `long:"enable-mls" description:"require client authentication for MLS"`
Enable bool `long:"enable" description:"require client authentication via wallet tokens"`
/*
Ratelimits enables request rate limiting.

Expand Down
18 changes: 2 additions & 16 deletions pkg/api/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,9 @@ func (wa *WalletAuthorizer) Stream() grpc.StreamServerInterceptor {
}
}

func (wa *WalletAuthorizer) isProtocolMLS(request *messagev1.PublishRequest) bool {
envelopes := request.Envelopes
if len(envelopes) == 0 {
return false
}
// If any of the envelopes are not for a v3 topic, then we treat the request as non-v3
for _, envelope := range envelopes {
if !strings.HasPrefix(envelope.ContentTopic, "/xmtp/mls/") {
return false
}
}
return true
}

func (wa *WalletAuthorizer) requiresAuthorization(req interface{}) bool {
publishRequest, isPublish := req.(*messagev1.PublishRequest)
return isPublish && (!wa.isProtocolMLS(publishRequest) || wa.AuthnConfig.EnableMLS)
snormore marked this conversation as resolved.
Show resolved Hide resolved
_, isPublish := req.(*messagev1.PublishRequest)
return isPublish
}

func (wa *WalletAuthorizer) getWallet(ctx context.Context) (types.WalletAddr, error) {
Expand Down
67 changes: 24 additions & 43 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ import (
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
wakunode "github.com/waku-org/go-waku/waku/v2/node"
wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
wakurelay "github.com/waku-org/go-waku/waku/v2/protocol/relay"
proto "github.com/xmtp/proto/v3/go/message_api/v1"
apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
"github.com/xmtp/xmtp-node-go/pkg/logging"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
"github.com/xmtp/xmtp-node-go/pkg/store"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -45,24 +42,24 @@ type Service struct {

// Configured as constructor options.
log *zap.Logger
waku *wakunode.WakuNode
store *store.Store

publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error

// Configured internally.
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
relaySub *wakurelay.Subscription

ns *server.Server
nc *nats.Conn
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store) (s *Service, err error) {
func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
s = &Service{
waku: node,
log: logger.Named("message/v1"),
store: store,
log: log.Named("message/v1"),
store: store,
publishToWakuRelay: publishToWakuRelay,
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand All @@ -82,44 +79,11 @@ func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store)
return nil, err
}

// Initialize waku relay subscription.
s.relaySub, err = s.waku.Relay().Subscribe(s.ctx)
if err != nil {
return nil, errors.Wrap(err, "subscribing to relay")
}
tracing.GoPanicWrap(s.ctx, &s.wg, "broadcast", func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case wakuEnv := <-s.relaySub.Ch:
if wakuEnv == nil {
continue
}
env := buildEnvelope(wakuEnv.Message())

envB, err := pb.Marshal(env)
if err != nil {
s.log.Error("marshalling envelope", zap.Error(err))
continue
}
err = s.nc.Publish(buildNatsSubject(env.ContentTopic), envB)
if err != nil {
s.log.Error("publishing envelope to local nats", zap.Error(err))
continue
}
}
}
})

return s, nil
}

func (s *Service) Close() {
s.log.Info("closing")
if s.relaySub != nil {
s.relaySub.Unsubscribe()
}

if s.ctxCancel != nil {
s.ctxCancel()
Expand All @@ -136,6 +100,22 @@ func (s *Service) Close() {
s.log.Info("closed")
}

func (s *Service) HandleIncomingWakuRelayMessage(msg *wakupb.WakuMessage) error {
env := buildEnvelope(msg)

envB, err := pb.Marshal(env)
if err != nil {
return err
}

err = s.nc.Publish(buildNatsSubject(env.ContentTopic), envB)
if err != nil {
return err
}

return nil
}

func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*proto.PublishResponse, error) {
for _, env := range req.Envelopes {
log := s.log.Named("publish").With(zap.String("content_topic", env.ContentTopic))
Expand All @@ -156,14 +136,15 @@ func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*prot
}
}

_, err := s.waku.Relay().Publish(ctx, &wakupb.WakuMessage{
err := s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: int64(env.TimestampNs),
Payload: env.Message,
})
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}

metrics.EmitPublishedEnvelope(ctx, log, env)
}
return &proto.PublishResponse{}, nil
Expand Down
Loading