Skip to content

Commit

Permalink
feat: allow to configure options when calling pull method on subscrip…
Browse files Browse the repository at this point in the history
…tion (#23)

Co-authored-by: jhuteau <[email protected]>
  • Loading branch information
jon-ht and jhuteau authored Aug 23, 2023
1 parent b58a774 commit bf05d29
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 35 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ framework:
options:
client_config: # optional (default: [])
apiEndpoint: 'https://europe-west3-pubsub.googleapis.com'
max_messages_pull: 10 # optional (default: 10)
topic: # optional (default name: messages)
name: 'messages'
options: # optional create options if not exists (default: []), for all options take at look at https://googleapis.github.io/google-cloud-php/#/docs/google-cloud/v0.188.0/pubsub/topic?method=create
Expand All @@ -66,6 +65,9 @@ framework:
labels:
- label1
- label2
pull:
maxMessages: 10 # optional (default: 10)

```
or:
```yaml
Expand All @@ -75,7 +77,7 @@ framework:
messenger:
transports:
gps_transport:
dsn: 'gps://default/messages?client_config[apiEndpoint]=https://europe-west3-pubsub.googleapis.com&max_messages_pull=10'
dsn: 'gps://default/messages?client_config[apiEndpoint]=https://europe-west3-pubsub.googleapis.com&subscription[pull][maxMessages]=10'
```
to use emulator in local:
```yaml
Expand Down
86 changes: 74 additions & 12 deletions Tests/Transport/GpsConfigurationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL,
[],
[],
[]
[],
['maxMessages' => GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL, 'returnImmediately' => false]
),
],
'Custom topic/subscription name configured through dsn #1' => [
Expand All @@ -52,10 +52,10 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
'something',
'something',
GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL,
[],
[],
[]
[],
['maxMessages' => GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL, 'returnImmediately' => false]
),
],
'Custom topic/subscription name configured through dsn #2 (deprecated queue[name])' => [
Expand All @@ -64,10 +64,10 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
'topic_name',
'subscription_name',
GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL,
[],
[],
[]
[],
['maxMessages' => GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL, 'returnImmediately' => false]
),
],
'Custom topic/subscription name configured through dsn #3' => [
Expand All @@ -76,10 +76,10 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
'topic_name',
'subscription_name',
5,
['apiEndpoint' => 'https://europe-west3-pubsub.googleapis.com'],
['labels' => ['label_topic1']],
['labels' => ['label_subscription1'], 'enableMessageOrdering' => true, 'ackDeadlineSeconds' => 100],
['maxMessages' => 5, 'returnImmediately' => false]
),
],
'Custom topic/subscription name configured through options #1' => [
Expand All @@ -90,10 +90,10 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
'something',
'something',
GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL,
[],
[],
[]
[],
['maxMessages' => GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL, 'returnImmediately' => false]
),
],
'Custom topic/subscription name configured through options #2' => [
Expand All @@ -105,10 +105,10 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
'topic_name',
'subscription_name',
GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL,
[],
[],
[]
[],
['maxMessages' => GpsConfigurationResolverInterface::DEFAULT_MAX_MESSAGES_PULL, 'returnImmediately' => false]
),
],
'Custom topic/subscription name configured through options #4' => [
Expand Down Expand Up @@ -136,10 +136,72 @@ public function dataProvider(): array
'expectedConfiguration' => new GpsConfiguration(
'topic_name1',
'subscription_name',
5,
['apiEndpoint' => 'https://europe-west3-pubsub.googleapis.com'],
['labels' => ['label_topic1']],
['labels' => ['label_subscription1'], 'enableMessageOrdering' => true, 'ackDeadlineSeconds' => 100],
['maxMessages' => 5, 'returnImmediately' => false]
),
],
'Custom subscription pull options configured through dsn #1 (deprecated max_messages_pull)' => [
'dsn' => 'gps://default?max_messages_pull=5',
'options' => [],
'expectedConfiguration' => new GpsConfiguration(
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
[],
[],
[],
['maxMessages' => 5, 'returnImmediately' => false]
),
],
'Custom subscription pull options configured through dsn #2' => [
'dsn' => 'gps://default?subscription[pull][maxMessages]=5',
'options' => [],
'expectedConfiguration' => new GpsConfiguration(
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
[],
[],
[],
['maxMessages' => 5, 'returnImmediately' => false]
),
],
'Custom subscription pull options configured through options #1' => [
'dsn' => 'gps://default',
'options' => [
'subscription' => [
'pull' => [
'maxMessages' => 5,
'returnImmediately' => true,
]
],
],
'expectedConfiguration' => new GpsConfiguration(
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
[],
[],
[],
['maxMessages' => 5, 'returnImmediately' => true]
),
],
'Custom subscription pull options configured through options #2' => [
'dsn' => 'gps://default',
'options' => [
'max_messages_pull' => 5,
'subscription' => [
'pull' => [
'returnImmediately' => true,
]
],
],
'expectedConfiguration' => new GpsConfiguration(
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
GpsConfigurationResolverInterface::DEFAULT_TOPIC_NAME,
[],
[],
[],
['maxMessages' => 5, 'returnImmediately' => true]
),
],
];
Expand Down
18 changes: 9 additions & 9 deletions Transport/GpsConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ final class GpsConfiguration implements GpsConfigurationInterface
{
private string $topicName;
private string $subscriptionName;
private int $maxMessagesPull;
private array $clientConfig;
private array $topicOptions;
private array $subscriptionOptions;
private array $subscriptionPullOptions;

public function __construct(
string $queueName,
string $subscriptionName,
int $maxMessagesPull,
array $clientConfig,
array $topicOptions,
array $subscriptionOptions
array $subscriptionOptions,
array $subscriptionPullOptions
) {
$this->topicName = $queueName;
$this->subscriptionName = $subscriptionName;
$this->maxMessagesPull = $maxMessagesPull;
$this->clientConfig = $clientConfig;
$this->topicOptions = $topicOptions;
$this->subscriptionOptions = $subscriptionOptions;
$this->subscriptionPullOptions = $subscriptionPullOptions;
}

public function getTopicName(): string
Expand All @@ -42,11 +42,6 @@ public function getSubscriptionName(): string
return $this->subscriptionName;
}

public function getMaxMessagesPull(): int
{
return $this->maxMessagesPull;
}

public function getClientConfig(): array
{
return $this->clientConfig;
Expand All @@ -61,4 +56,9 @@ public function getSubscriptionOptions(): array
{
return $this->subscriptionOptions;
}

public function getSubscriptionPullOptions(): array
{
return $this->subscriptionPullOptions;
}
}
7 changes: 5 additions & 2 deletions Transport/GpsConfigurationInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ public function getTopicName(): string;

public function getSubscriptionName(): string;

public function getMaxMessagesPull(): int;

/**
* @see PubSubClient constructor options
*/
Expand All @@ -33,4 +31,9 @@ public function getTopicOptions(): array;
* @see Subscription::create options
*/
public function getSubscriptionOptions(): array;

/**
* @see Subscription::pull options
*/
public function getSubscriptionPullOptions(): array;
}
72 changes: 63 additions & 9 deletions Transport/GpsConfigurationResolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ final class GpsConfigurationResolver implements GpsConfigurationResolverInterfac
self::INT_NORMALIZER_KEY => ['ackDeadlineSeconds', 'maxDeliveryAttempts'],
self::BOOL_NORMALIZER_KEY => ['enableMessageOrdering', 'retainAckedMessages', 'enableExactlyOnceDelivery'],
];
private const NORMALIZABLE_SUBSCRIPTION_PULL_OPTIONS = [
self::INT_NORMALIZER_KEY => ['maxMessages'],
self::BOOL_NORMALIZER_KEY => ['returnImmediately'],
];

/**
* {@inheritdoc}
Expand All @@ -42,6 +46,21 @@ public function resolve(string $dsn, array $options): GpsConfigurationInterface
return $data;
};

$subscriptionPullOptionsNormalizer = static function (Options $options, $data) {
foreach ($data ?? [] as $optionName => $optionValue) {
switch ($optionName) {
case \in_array($optionName, self::NORMALIZABLE_SUBSCRIPTION_PULL_OPTIONS[self::INT_NORMALIZER_KEY], true):
$data[$optionName] = (int) filter_var($optionValue, FILTER_SANITIZE_NUMBER_INT);
break;
case \in_array($optionName, self::NORMALIZABLE_SUBSCRIPTION_PULL_OPTIONS[self::BOOL_NORMALIZER_KEY], true):
$data[$optionName] = filter_var($optionValue, FILTER_VALIDATE_BOOLEAN);
break;
}
}

return $data;
};

$mergedOptions = $this->getMergedOptions($dsn, $options);

$optionsResolver = new OptionsResolver();
Expand All @@ -67,9 +86,25 @@ function (OptionsResolver $resolver, Options $parentOptions) use ($subscriptionO
)
;
}

if (isset($mergedOptions['max_messages_pull'])) {
$optionsResolver
->setDefault('max_messages_pull', self::DEFAULT_MAX_MESSAGES_PULL)
->setNormalizer('max_messages_pull', static function (Options $options, $value): ?int {
return ((int) filter_var($value, FILTER_SANITIZE_NUMBER_INT)) ?: null;
})
->setAllowedTypes('max_messages_pull', ['int', 'string'])
->setDeprecated(
'max_messages_pull',
'petitpress/gps-messenger-bundle',
'1.6.0',
'The option "max_messages_pull" is deprecated, use option "subscription.pull.maxMessages" instead.'
)
;
}

$optionsResolver
->setDefault('client_config', [])
->setDefault('max_messages_pull', self::DEFAULT_MAX_MESSAGES_PULL)
->setDefault('topic', function (OptionsResolver $topicResolver): void {
$topicResolver
->setDefault('name', self::DEFAULT_TOPIC_NAME)
Expand All @@ -80,13 +115,24 @@ function (OptionsResolver $resolver, Options $parentOptions) use ($subscriptionO
})
->setDefault(
'subscription',
function (OptionsResolver $resolver, Options $parentOptions) use ($subscriptionOptionsNormalizer): void {
function (OptionsResolver $resolver, Options $parentOptions) use ($subscriptionOptionsNormalizer, $subscriptionPullOptionsNormalizer): void {
if ($parentOptions->offsetExists('queue')) {
$resolver
->setDefault('name', $parentOptions['queue']['name'])
->setDefault('options', $parentOptions['queue']['options'])
->setDefault(
'pull',
function (OptionsResolver $pullResolver) use ($parentOptions): void {
$pullResolver
->setDefault('maxMessages', $parentOptions->offsetExists('max_messages_pull') ? $parentOptions['max_messages_pull'] : self::DEFAULT_MAX_MESSAGES_PULL)
->setDefault('returnImmediately', false)
;
}
)
->setAllowedTypes('name', 'string')
->setAllowedTypes('options', 'array')
->setAllowedTypes('pull', 'array')
->setNormalizer('pull', $subscriptionPullOptionsNormalizer)
;

return;
Expand All @@ -95,15 +141,23 @@ function (OptionsResolver $resolver, Options $parentOptions) use ($subscriptionO
$resolver
->setDefault('name', $parentOptions['topic']['name'])
->setDefault('options', [])
->setDefault(
'pull',
function (OptionsResolver $pullResolver) use ($parentOptions): void {
$pullResolver
->setDefault('maxMessages', $parentOptions->offsetExists('max_messages_pull') ? $parentOptions['max_messages_pull'] : self::DEFAULT_MAX_MESSAGES_PULL)
->setDefault('returnImmediately', false)
;
}
)
->setAllowedTypes('name', 'string')
->setAllowedTypes('options', 'array')
->setNormalizer('options', $subscriptionOptionsNormalizer);
->setAllowedTypes('pull', 'array')
->setNormalizer('options', $subscriptionOptionsNormalizer)
->setNormalizer('pull', $subscriptionPullOptionsNormalizer)
;
}
)
->setNormalizer('max_messages_pull', static function (Options $options, $value): ?int {
return ((int) filter_var($value, FILTER_SANITIZE_NUMBER_INT)) ?: null;
})
->setAllowedTypes('max_messages_pull', ['int', 'string'])
->setAllowedTypes('client_config', 'array')
;

Expand All @@ -112,10 +166,10 @@ function (OptionsResolver $resolver, Options $parentOptions) use ($subscriptionO
return new GpsConfiguration(
$resolvedOptions['topic']['name'],
$resolvedOptions['subscription']['name'],
$resolvedOptions['max_messages_pull'],
$resolvedOptions['client_config'],
$resolvedOptions['topic']['options'],
$resolvedOptions['subscription']['options']
$resolvedOptions['subscription']['options'],
$resolvedOptions['subscription']['pull']
);
}

Expand Down
2 changes: 1 addition & 1 deletion Transport/GpsReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function get(): iterable
try {
$messages = $this->pubSubClient
->subscription($this->gpsConfiguration->getSubscriptionName())
->pull(['maxMessages' => $this->gpsConfiguration->getMaxMessagesPull()]);
->pull($this->gpsConfiguration->getSubscriptionPullOptions());

foreach ($messages as $message) {
yield $this->createEnvelopeFromPubSubMessage($message);
Expand Down

0 comments on commit bf05d29

Please sign in to comment.