Skip to content

Commit

Permalink
Merge pull request #42341 from nosan
Browse files Browse the repository at this point in the history
* pr/42341:
  Polish "Add option for configuring max messages per task"
  Add option for configuring max messages per task

Closes gh-42341
  • Loading branch information
snicoll committed Sep 19, 2024
2 parents e133ea3 + e930a96 commit e950801
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFact
map.from(listenerProperties::isAutoStartup).to(factory::setAutoStartup);
map.from(listenerProperties::formatConcurrency).to(factory::setConcurrency);
map.from(listenerProperties::getReceiveTimeout).as(Duration::toMillis).to(factory::setReceiveTimeout);
map.from(listenerProperties::getMaxMessagesPerTask).to(factory::setMaxMessagesPerTask);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public static class Listener {
*/
private Duration receiveTimeout = Duration.ofSeconds(1);

/**
* Specify the maximum number of messages to process in one task. By default,
* unlimited unless a SchedulingTaskExecutor is configured on the listener (10
* messages), as it indicates a preference for short-lived tasks.
*/
private Integer maxMessagesPerTask;

private final Session session = new Session();

public boolean isAutoStartup() {
Expand Down Expand Up @@ -250,6 +257,14 @@ public void setReceiveTimeout(Duration receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

public Integer getMaxMessagesPerTask() {
return this.maxMessagesPerTask;
}

public void setMaxMessagesPerTask(Integer maxMessagesPerTask) {
this.maxMessagesPerTask = maxMessagesPerTask;
}

public Session getSession() {
return this.session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ void testJmsListenerContainerFactoryWithCustomSettings() {
"spring.jms.listener.session.acknowledgeMode=client",
"spring.jms.listener.session.transacted=false", "spring.jms.listener.minConcurrency=2",
"spring.jms.listener.receiveTimeout=2s", "spring.jms.listener.maxConcurrency=10",
"spring.jms.subscription-durable=true", "spring.jms.client-id=exampleId")
"spring.jms.subscription-durable=true", "spring.jms.client-id=exampleId",
"spring.jms.listener.max-messages-per-task=5")
.run(this::testJmsListenerContainerFactoryWithCustomSettings);
}

Expand All @@ -188,6 +189,7 @@ private void testJmsListenerContainerFactoryWithCustomSettings(AssertableApplica
assertThat(container.getConcurrentConsumers()).isEqualTo(2);
assertThat(container.getMaxConcurrentConsumers()).isEqualTo(10);
assertThat(container).hasFieldOrPropertyWithValue("receiveTimeout", 2000L);
assertThat(container).hasFieldOrPropertyWithValue("maxMessagesPerTask", 5);
assertThat(container.isSubscriptionDurable()).isTrue();
assertThat(container.getClientId()).isEqualTo("exampleId");
}
Expand Down

0 comments on commit e950801

Please sign in to comment.