From d11dde7190c5d3e3d088c635736c9f394416dc21 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Mon, 26 Aug 2024 12:27:43 +0100 Subject: [PATCH 1/4] Set random pk in kinesis source --- pkg/source/kinesis/kinesis_source.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index d9ad9fb2..613b7f2a 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -231,7 +231,7 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { messages := []*models.Message{ { Data: record.Data, - PartitionKey: *record.PartitionKey, + PartitionKey: uuid.NewV4().String(), AckFunc: ackFunc, TimeCreated: timeCreated, TimePulled: timePulled, From 2465e8328ed6ae71d1b1af8cd25b65bd7cfcdd91 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Mon, 26 Aug 2024 16:39:48 +0100 Subject: [PATCH 2/4] bump versions for test release --- VERSION | 2 +- cmd/constants.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 005119ba..7ae1af74 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.4.1 +2.4.2-test diff --git a/cmd/constants.go b/cmd/constants.go index af4ab80b..e63bc7e3 100644 --- a/cmd/constants.go +++ b/cmd/constants.go @@ -13,7 +13,7 @@ package cmd const ( // AppVersion is the current version of the app - AppVersion = "2.4.1" + AppVersion = "2.4.2-test" // AppName is the name of the application to use in logging / places that require the artifact AppName = "snowbridge" From 36f6d59c87b578385b807180488f7644739206a3 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 27 Aug 2024 17:21:36 +0100 Subject: [PATCH 3/4] Use update UUID package for UUIDs --- go.mod | 7 ++----- go.sum | 6 ------ pkg/source/kafka/kafka_source.go | 6 ++++-- pkg/source/kinesis/kinesis_source.go | 10 +++++++--- pkg/source/pubsub/pubsub_source.go | 7 +++++-- pkg/source/sqs/sqs_source.go | 6 ++++-- pkg/source/stdin/stdin_source.go | 6 ++++-- pkg/target/eventhub_test.go | 4 ++-- pkg/telemetry/telemetry.go | 4 ++-- pkg/testutil/common.go | 6 +++--- 10 files changed, 33 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 1c910cd7..76323343 100644 --- a/go.mod +++ b/go.mod @@ -17,13 +17,12 @@ require ( github.com/getsentry/sentry-go v0.27.0 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/compress v1.17.8 // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/myesui/uuid v1.0.0 // indirect github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 github.com/smira/go-statsd v1.3.3 @@ -31,19 +30,17 @@ require ( github.com/snowplow-devops/go-sentryhook v0.0.0-20210106082031-21bf7f9dac2a github.com/snowplow/snowplow-golang-analytics-sdk v0.3.0 github.com/stretchr/testify v1.9.0 - github.com/twinj/uuid v1.0.0 github.com/twitchscience/kinsumer v0.0.0-20240315191529-9a48088063ec github.com/urfave/cli v1.22.14 github.com/xdg/scram v1.0.5 golang.org/x/crypto v0.22.0 // indirect golang.org/x/net v0.24.0 // indirect - golang.org/x/oauth2 v0.19.0 // indirect + golang.org/x/oauth2 v0.19.0 golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/api v0.172.0 // indirect google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda google.golang.org/grpc v1.63.2 - gopkg.in/stretchr/testify.v1 v1.2.2 // indirect ) require ( diff --git a/go.sum b/go.sum index 3c25d1e4..e19f73f3 100644 --- a/go.sum +++ b/go.sum @@ -297,8 +297,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= -github.com/myesui/uuid v1.0.0 h1:xCBmH4l5KuvLYc5L7AS7SZg9/jKdIFubM7OVoLqaQUI= -github.com/myesui/uuid v1.0.0/go.mod h1:2CDfNgU0LR8mIdO8vdWd8i9gWWxLlcoIGGpSNgafq84= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= @@ -365,8 +363,6 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk= -github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= @@ -558,8 +554,6 @@ gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8 gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= gopkg.in/ini.v1 v1.51.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= -gopkg.in/stretchr/testify.v1 v1.2.2 h1:yhQC6Uy5CqibAIlk1wlusa/MJ3iAN49/BsR/dCCKz3M= -gopkg.in/stretchr/testify.v1 v1.2.2/go.mod h1:QI5V/q6UbPmuhtm10CaFZxED9NreB8PnFYN9JcR6TxU= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/source/kafka/kafka_source.go b/pkg/source/kafka/kafka_source.go index 1ddc54c9..110d1891 100644 --- a/pkg/source/kafka/kafka_source.go +++ b/pkg/source/kafka/kafka_source.go @@ -19,13 +19,13 @@ import ( "time" "github.com/IBM/sarama" + "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/source/sourceiface" - "github.com/twinj/uuid" ) // Configuration configures the source for records @@ -92,7 +92,7 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai newMessage := &models.Message{ Data: message.Value, - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), TimeCreated: message.Timestamp, TimePulled: time.Now().UTC(), } @@ -284,6 +284,8 @@ func newKafkaSource(cfg *Configuration) (*kafkaSource, error) { // newKafkaSourceWithInterfaces creates a new source for reading messages from Apache Kafka, allowing the user to provide a mocked client. func newKafkaSourceWithInterfaces(client sarama.ConsumerGroup, s *kafkaSource) (*kafkaSource, error) { + // Ensures as even as possible distribution of UUIDs + uuid.EnableRandPool() s.client = client return s, nil } diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index 613b7f2a..e5d3d3bb 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -20,10 +20,10 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/twinj/uuid" "github.com/twitchscience/kinsumer" "github.com/snowplow/snowbridge/config" @@ -181,8 +181,11 @@ func newKinesisSourceWithInterfaces( WithIteratorStartTimestamp(startTimestamp). WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond) + // Ensures as even as possible distribution of UUIDs + uuid.EnableRandPool() + // TODO: See if the client name can be reused to survive same node reboots - name := uuid.NewV4().String() + name := uuid.New().String() k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config) if err != nil { @@ -228,10 +231,11 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { if record != nil { timeCreated := record.ApproximateArrivalTimestamp.UTC() + messages := []*models.Message{ { Data: record.Data, - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), AckFunc: ackFunc, TimeCreated: timeCreated, TimePulled: timePulled, diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index 3a12cee0..1630288a 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -17,9 +17,9 @@ import ( "time" "cloud.google.com/go/pubsub" + "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/twinj/uuid" "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" @@ -96,6 +96,9 @@ var ConfigPair = config.ConfigurationPair{ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string) (*pubSubSource, error) { ctx := context.Background() + // Ensures as even as possible distribution of UUIDs + uuid.EnableRandPool() + client, err := pubsub.NewClient(ctx, projectID) if err != nil { return nil, errors.Wrap(err, "Failed to create PubSub client") @@ -137,7 +140,7 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error { messages := []*models.Message{ { Data: msg.Data, - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), AckFunc: ackFunc, TimeCreated: timeCreated, TimePulled: timePulled, diff --git a/pkg/source/sqs/sqs_source.go b/pkg/source/sqs/sqs_source.go index f81baad2..7743c435 100644 --- a/pkg/source/sqs/sqs_source.go +++ b/pkg/source/sqs/sqs_source.go @@ -20,9 +20,9 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/twinj/uuid" "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/common" @@ -121,6 +121,8 @@ var ConfigPair = config.ConfigurationPair{ // newSQSSourceWithInterfaces allows you to provide an SQS client directly to allow // for mocking and localstack usage func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) { + // Ensures as even as possible distribution of UUIDs + uuid.EnableRandPool() return &sqsSource{ client: client, queueName: queueName, @@ -228,7 +230,7 @@ func (ss *sqsSource) process(sf *sourceiface.SourceFunctions) error { messages = append(messages, &models.Message{ Data: []byte(*msg.Body), - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), AckFunc: ackFunc, TimeCreated: timeCreated, TimePulled: timePulled, diff --git a/pkg/source/stdin/stdin_source.go b/pkg/source/stdin/stdin_source.go index 72e42555..d163cc47 100644 --- a/pkg/source/stdin/stdin_source.go +++ b/pkg/source/stdin/stdin_source.go @@ -17,9 +17,9 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/twinj/uuid" "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" @@ -84,6 +84,8 @@ var ConfigPair = config.ConfigurationPair{ // newStdinSource creates a new client for reading messages from stdin func newStdinSource(concurrentWrites int) (*stdinSource, error) { + // Ensures as even as possible distribution of UUIDs + uuid.EnableRandPool() return &stdinSource{ concurrentWrites: concurrentWrites, log: log.WithFields(log.Fields{"source": "stdin"}), @@ -103,7 +105,7 @@ func (ss *stdinSource) Read(sf *sourceiface.SourceFunctions) error { messages := []*models.Message{ { Data: []byte(scanner.Text()), - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), TimeCreated: timeNow, TimePulled: timeNow, }, diff --git a/pkg/target/eventhub_test.go b/pkg/target/eventhub_test.go index ed6c8c1e..c6d876cc 100644 --- a/pkg/target/eventhub_test.go +++ b/pkg/target/eventhub_test.go @@ -21,9 +21,9 @@ import ( "time" eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/google/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/twinj/uuid" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/testutil" @@ -71,7 +71,7 @@ func (m mockHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, } for !iterator.Done() { - id := uuid.NewV4() + id := uuid.New() batch, err := iterator.Next(id.String(), batchOptions) if err != nil { diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index a3ac519b..f0558a49 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -5,10 +5,10 @@ import ( "net/http" "time" + "github.com/google/uuid" log "github.com/sirupsen/logrus" conf "github.com/snowplow/snowbridge/config" gt "github.com/snowplow/snowplow-golang-tracker/v2/tracker" - "github.com/twinj/uuid" ) // config holds the configuration for telemetry @@ -36,7 +36,7 @@ func newTelemetryWithConfig(cfg *conf.Config) *config { userProvidedID: cfg.Data.UserProvidedID, applicationName: applicationName, applicationVersion: applicationVersion, - appGeneratedID: uuid.NewV4().String(), + appGeneratedID: uuid.New().String(), } } diff --git a/pkg/testutil/common.go b/pkg/testutil/common.go index df168f98..90d27e74 100644 --- a/pkg/testutil/common.go +++ b/pkg/testutil/common.go @@ -16,7 +16,7 @@ import ( "math/rand" "time" - "github.com/twinj/uuid" + "github.com/google/uuid" "github.com/snowplow/snowbridge/pkg/models" ) @@ -45,7 +45,7 @@ func GetTestMessages(count int, body string, ackFunc func()) []*models.Message { for i := 0; i < count; i++ { messages = append(messages, &models.Message{ Data: []byte(body), - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), AckFunc: ackFunc, }) } @@ -59,7 +59,7 @@ func GetSequentialTestMessages(count int, ackFunc func()) []*models.Message { for i := 0; i < count; i++ { messages = append(messages, &models.Message{ Data: []byte(fmt.Sprint(i)), - PartitionKey: uuid.NewV4().String(), + PartitionKey: uuid.New().String(), AckFunc: ackFunc, }) } From c0230beab2d4b830ae7f9730256bf6fea45dcd6e Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 27 Aug 2024 17:21:47 +0100 Subject: [PATCH 4/4] Bump version for test release --- VERSION | 2 +- cmd/constants.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 7ae1af74..2eff27b3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.4.2-test +2.4.2-test2 diff --git a/cmd/constants.go b/cmd/constants.go index e63bc7e3..3312d51f 100644 --- a/cmd/constants.go +++ b/cmd/constants.go @@ -13,7 +13,7 @@ package cmd const ( // AppVersion is the current version of the app - AppVersion = "2.4.2-test" + AppVersion = "2.4.2-test2" // AppName is the name of the application to use in logging / places that require the artifact AppName = "snowbridge"