Skip to content

Commit

Permalink
More comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Sep 23, 2024
1 parent 9a01159 commit e90e669
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
4 changes: 3 additions & 1 deletion cmd/jetstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
getJSONEvent := func() []byte { return asJSON }
getCompressedEvent := func() []byte { return compBytes }

// Concurrently emit to all subscribers
// We can't move on until all subscribers have received the event or been dropped for being too slow
sem := semaphore.NewWeighted(maxConcurrentEmits)
for _, sub := range s.Subscribers {
if err := sem.Acquire(ctx, 1); err != nil {
Expand All @@ -106,7 +108,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
return
}

// Pick the event valuer for the subscriber
// Pick the event valuer for the subscriber based on their compression preference
getEventBytes := getJSONEvent
if sub.compress {
getEventBytes = getCompressedEvent
Expand Down
7 changes: 6 additions & 1 deletion pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,19 +343,24 @@ func (c *Consumer) RunSequencer(ctx context.Context) error {
e.TimeUS = c.clock.Now()
c.sequenced.Inc()

// Encode the event in JSON and compress it
// Serialize the event as JSON
asJSON, err := json.Marshal(e)
if err != nil {
log.Error("failed to marshal event", "error", err)
return
}

// Compress the serialized JSON using zstd
compBytes := c.encoder.EncodeAll(asJSON, nil)

// Persist the event to the uncompressed and compressed DBs
if err := c.PersistEvent(ctx, e, asJSON, compBytes); err != nil {
log.Error("failed to persist event", "error", err)
return
}
c.persisted.Inc()

// Emit the event to subscribers
if err := c.Emit(ctx, e, asJSON, compBytes); err != nil {
log.Error("failed to emit event", "error", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/consumer/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON,
key = []byte(fmt.Sprintf("%d_%s", evt.TimeUS, evt.Did))
}

// Write the event to the uncompressed DB
// Write the uncompressed event to the uncompressed DB
err := c.UncompressedDB.Set(key, asJSON, pebble.NoSync)
if err != nil {
log.Error("failed to write event to pebble", "error", err)
return fmt.Errorf("failed to write event to pebble: %w", err)
}

// Compress the event and write it to the compressed DB
// Write the compressed event to the compressed DB
err = c.CompressedDB.Set(key, compBytes, pebble.NoSync)
if err != nil {
log.Error("failed to write compressed event to pebble", "error", err)
Expand Down

0 comments on commit e90e669

Please sign in to comment.