Skip to content

Commit

Permalink
[improve][client] Add newMessage with schema and transactions
Browse files Browse the repository at this point in the history
Pulsar Client allows callers to create messages with a schema or a transaction, but
not both. This commit adds a new method in the producer that allows callers to create
a message with both a schema and transaction.
  • Loading branch information
Ómar Kjartan Yasin committed Feb 6, 2025
1 parent cdab2d6 commit d77ad02
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ public interface Producer<T> extends Closeable {
* @since 2.7.0
*/
TypedMessageBuilder<T> newMessage(Transaction txn);

/**
* Create a new message builder with transaction and schema, not required same parameterized type with the
* producer.
*
* <p>After the transaction commit, it will be made visible to consumer.
*
* <p>After the transaction abort, it will never be visible to consumer.
*
* @return a typed message builder that can be used to construct the message to be sent through this producer
* @see #newMessage()
*/
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
Transaction txn);

/**
* Get the last sequence id that was published by this producer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public TypedMessageBuilder<T> newMessage() {
return new TypedMessageBuilderImpl<>(this, schema);
}

@Override
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
checkArgument(schema != null);
return new TypedMessageBuilderImpl<>(this, schema);
Expand All @@ -92,6 +93,14 @@ public TypedMessageBuilder<T> newMessage(Transaction txn) {
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
}

@Override
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
Transaction txn) {
checkArgument(txn instanceof TransactionImpl);
checkArgument(schema != null);
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
}

abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);

abstract CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn);
Expand Down

0 comments on commit d77ad02

Please sign in to comment.