diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index b4ca6ab3b3d..089fd39b3e9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -36,8 +36,8 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; /** - * @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation {@link - * DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages. + * @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation + * {@link DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages. */ @Deprecated public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { @@ -375,6 +375,20 @@ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offs this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback); } + @Override + public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector, + long offset, int maxNums, + PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums, pullCallback); + } + + @Override + public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector selector, + long offset, + int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pullBlockIfNotFoundWithMessageSelector(mq, selector, offset, maxNums); + } + @Override public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 868ee93ff8a..ee77b12bbc8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -47,8 +47,7 @@ public interface MQPullConsumer extends MQConsumer { * * @param mq from which message queue * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if - * null or * expression,meaning subscribe - * all + * null or * expression,meaning subscribe all * @param offset from where to pull * @param maxNums max pulling numbers * @return The resulting {@code PullRequest} @@ -121,7 +120,7 @@ void pull(final MessageQueue mq, final String subExpression, final long offset, InterruptedException; /** - * Pulling the messages in a async. way. Support message selection + * Pulling the messages in a async way. Support message selection */ void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, @@ -150,6 +149,23 @@ void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, fina final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException; + /** + * Pulling the messages through callback function,if no message arrival,blocking. Support message selection + */ + void pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector, + final long offset, final int maxNums, + final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; + + /** + * Pulling the messages,if no message arrival,blocking some time. Support message selection + * + * @return The resulting {@code PullRequest} + */ + PullResult pullBlockIfNotFoundWithMessageSelector(final MessageQueue mq, final MessageSelector selector, + final long offset, final int maxNums) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; + /** * Update the offset */ diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 91d72989cab..c877ccc0702 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -589,6 +589,21 @@ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offs this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } + public void pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true, + this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + } + + public PullResult pullBlockIfNotFoundWithMessageSelector(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) + throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { this.isRunning();