From 075bd7787118a1a483befaff5ae790efb68e776f Mon Sep 17 00:00:00 2001 From: Aditya Date: Fri, 17 May 2024 12:26:39 +0530 Subject: [PATCH 1/2] feat: sqs event source mapping - maximum_concurrency --- chalice/app.py | 12 +++-- chalice/awsclient.py | 10 ++++ chalice/deploy/appgraph.py | 1 + chalice/deploy/models.py | 1 + chalice/deploy/planner.py | 12 +++-- chalice/package.py | 29 +++++++---- docs/source/api.rst | 5 +- tests/unit/deploy/test_planner.py | 81 ++++++++++++++++++++++++++++++- tests/unit/test_package.py | 59 ++++++++++++++++++++++ 9 files changed, 191 insertions(+), 19 deletions(-) diff --git a/chalice/app.py b/chalice/app.py index 4fe176bf0..fded00783 100644 --- a/chalice/app.py +++ b/chalice/app.py @@ -768,7 +768,8 @@ def on_sns_message(self, topic: str, def on_sqs_message(self, queue: Optional[str] = None, batch_size: int = 1, name: Optional[str] = None, queue_arn: Optional[str] = None, - maximum_batching_window_in_seconds: int = 0 + maximum_batching_window_in_seconds: int = 0, + maximum_concurrency: Optional[int] = None, ) -> Callable[..., Any]: return self._create_registration_function( handler_type='on_sqs_message', @@ -778,7 +779,8 @@ def on_sqs_message(self, queue: Optional[str] = None, batch_size: int = 1, 'queue_arn': queue_arn, 'batch_size': batch_size, 'maximum_batching_window_in_seconds': - maximum_batching_window_in_seconds + maximum_batching_window_in_seconds, + 'maximum_concurrency': maximum_concurrency, } ) @@ -1101,6 +1103,8 @@ def _register_on_sqs_message(self, name: str, batch_size=kwargs['batch_size'], maximum_batching_window_in_seconds=kwargs[ 'maximum_batching_window_in_seconds'], + maximum_concurrency=kwargs[ + 'maximum_concurrency'], ) self.event_sources.append(sqs_config) @@ -1625,13 +1629,15 @@ def __init__(self, name: str, handler_string: str, topic: str): class SQSEventConfig(BaseEventSourceConfig): def __init__(self, name: str, handler_string: str, queue: Optional[str], queue_arn: Optional[str], batch_size: int, - maximum_batching_window_in_seconds: int): + maximum_batching_window_in_seconds: int, + maximum_concurrency: Optional[int]): super(SQSEventConfig, self).__init__(name, handler_string) self.queue: Optional[str] = queue self.queue_arn: Optional[str] = queue_arn self.batch_size: int = batch_size self.maximum_batching_window_in_seconds: int = \ maximum_batching_window_in_seconds + self.maximum_concurrency: Optional[int] = maximum_concurrency class KinesisEventConfig(BaseEventSourceConfig): diff --git a/chalice/awsclient.py b/chalice/awsclient.py index 082f7a50d..e37bf7da5 100644 --- a/chalice/awsclient.py +++ b/chalice/awsclient.py @@ -1826,6 +1826,7 @@ def create_lambda_event_source( batch_size: int, starting_position: Optional[str] = None, maximum_batching_window_in_seconds: Optional[int] = 0, + maximum_concurrency: Optional[int] = None, ) -> None: lambda_client = self._client('lambda') batch_window = maximum_batching_window_in_seconds @@ -1835,6 +1836,10 @@ def create_lambda_event_source( 'BatchSize': batch_size, 'MaximumBatchingWindowInSeconds': batch_window, } + if maximum_concurrency: + kwargs['ScalingConfig'] = { + 'MaximumConcurrency': maximum_concurrency + } if starting_position is not None: kwargs['StartingPosition'] = starting_position return self._call_client_method_with_retries( @@ -1848,6 +1853,7 @@ def update_lambda_event_source( event_uuid: str, batch_size: int, maximum_batching_window_in_seconds: Optional[int] = 0, + maximum_concurrency: Optional[int] = None, ) -> None: lambda_client = self._client('lambda') batch_window = maximum_batching_window_in_seconds @@ -1856,6 +1862,10 @@ def update_lambda_event_source( 'BatchSize': batch_size, 'MaximumBatchingWindowInSeconds': batch_window, } + if maximum_concurrency: + kwargs['ScalingConfig'] = { + 'MaximumConcurrency': maximum_concurrency + } self._call_client_method_with_retries( lambda_client.update_event_source_mapping, kwargs, diff --git a/chalice/deploy/appgraph.py b/chalice/deploy/appgraph.py index f7e9bcb73..4572fffb2 100644 --- a/chalice/deploy/appgraph.py +++ b/chalice/deploy/appgraph.py @@ -664,6 +664,7 @@ def _create_sqs_subscription( batch_size=sqs_config.batch_size, lambda_function=lambda_function, maximum_batching_window_in_seconds=batch_window, + maximum_concurrency=sqs_config.maximum_concurrency ) return sqs_event_source diff --git a/chalice/deploy/models.py b/chalice/deploy/models.py index a7bbe3dea..887f5a120 100644 --- a/chalice/deploy/models.py +++ b/chalice/deploy/models.py @@ -348,6 +348,7 @@ class SQSEventSource(FunctionEventSubscriber): queue: Union[str, QueueARN] batch_size: int maximum_batching_window_in_seconds: int + maximum_concurrency: Opt[int] = None @dataclass diff --git a/chalice/deploy/planner.py b/chalice/deploy/planner.py index b7a96b769..986e16091 100644 --- a/chalice/deploy/planner.py +++ b/chalice/deploy/planner.py @@ -731,10 +731,13 @@ def _plan_sqseventsource(self, resource): return instruction_for_queue_arn + [ models.APICall( method_name='update_lambda_event_source', - params={'event_uuid': uuid, - 'batch_size': resource.batch_size, - 'maximum_batching_window_in_seconds': - resource.maximum_batching_window_in_seconds} + params={ + 'event_uuid': uuid, + 'batch_size': resource.batch_size, + 'maximum_batching_window_in_seconds': + resource.maximum_batching_window_in_seconds, + 'maximum_concurrency': resource.maximum_concurrency + } ) ] + self._batch_record_resource( 'sqs_event', resource.resource_name, { @@ -751,6 +754,7 @@ def _plan_sqseventsource(self, resource): 'batch_size': resource.batch_size, 'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds, + 'maximum_concurrency': resource.maximum_concurrency, 'function_name': function_arn}, output_var=uuid_varname, ), 'Subscribing %s to SQS queue %s\n' diff --git a/chalice/package.py b/chalice/package.py index b8df388c4..06c4e01d6 100644 --- a/chalice/package.py +++ b/chalice/package.py @@ -659,15 +659,20 @@ def _generate_sqseventsource(self, resource, template): 'Fn::Sub': ('arn:${AWS::Partition}:sqs:${AWS::Region}' ':${AWS::AccountId}:%s' % resource.queue) } + properties = { + 'Queue': queue, + 'BatchSize': resource.batch_size, + 'MaximumBatchingWindowInSeconds': + resource.maximum_batching_window_in_seconds + } + if resource.maximum_concurrency: + properties["ScalingConfig"] = { + "MaximumConcurrency": resource.maximum_concurrency + } function_cfn['Properties']['Events'] = { sqs_cfn_name: { 'Type': 'SQS', - 'Properties': { - 'Queue': queue, - 'BatchSize': resource.batch_size, - 'MaximumBatchingWindowInSeconds': - resource.maximum_batching_window_in_seconds, - } + 'Properties': properties } } @@ -1123,14 +1128,20 @@ def _generate_sqseventsource(self, resource, template): ":%(account_id)s:%(queue)s", queue=resource.queue ) - template['resource'].setdefault('aws_lambda_event_source_mapping', {})[ - resource.resource_name] = { + + aws_lambda_event_source_mapping = { 'event_source_arn': event_source_arn, 'batch_size': resource.batch_size, 'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds, - 'function_name': self._fref(resource.lambda_function) + 'function_name': self._fref(resource.lambda_function), } + if resource.maximum_concurrency: + aws_lambda_event_source_mapping["scaling_config"] = { + "maximum_concurrency": resource.maximum_concurrency + } + template['resource'].setdefault('aws_lambda_event_source_mapping', {})[ + resource.resource_name] = aws_lambda_event_source_mapping def _generate_kinesiseventsource(self, resource, template): # type: (models.KinesisEventSource, Dict[str, Any]) -> None diff --git a/docs/source/api.rst b/docs/source/api.rst index a90ab0d92..b9ca7613b 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -282,7 +282,7 @@ Chalice entire lambda function name. This parameter is optional. If it is not provided, the name of the python function will be used. - .. method:: on_sqs_message(queue, batch_size=1, name=None, queue_arn=None, maximum_batching_window_in_seconds=0) + .. method:: on_sqs_message(queue, batch_size=1, name=None, queue_arn=None, maximum_batching_window_in_seconds=0, maximum_concurrency=None) Create a lambda function and configure it to be automatically invoked whenever a message is published to the specified SQS queue. @@ -337,6 +337,9 @@ Chalice :param maximum_batching_window_in_seconds: The maximum amount of time, in seconds, to gather records before invoking the function. + :param maximum_concurrency: The maximum number of concurrent functions + that the event source can invoke. + .. method:: on_kinesis_record(stream, batch_size=100, starting_position='LATEST', name=None, maximum_batching_window_in_seconds=0) Create a lambda function and configure it to be automatically invoked diff --git a/tests/unit/deploy/test_planner.py b/tests/unit/deploy/test_planner.py index 4f4817560..d0d934b63 100644 --- a/tests/unit/deploy/test_planner.py +++ b/tests/unit/deploy/test_planner.py @@ -1781,7 +1781,8 @@ def test_can_plan_sqs_event_source(self): ), 'batch_size': 10, 'maximum_batching_window_in_seconds': 60, - 'function_name': Variable("function_name_lambda_arn") + 'function_name': Variable("function_name_lambda_arn"), + 'maximum_concurrency': None }, output_var='function_name-sqs-event-source_uuid' ) @@ -1819,7 +1820,8 @@ def test_sqs_event_supports_queue_arn(self): ), 'batch_size': 10, 'maximum_batching_window_in_seconds': 0, - 'function_name': Variable("function_name_lambda_arn") + 'function_name': Variable("function_name_lambda_arn"), + 'maximum_concurrency': None, }, output_var='function_name-sqs-event-source_uuid' ) @@ -1863,6 +1865,7 @@ def test_can_update_sqs_event_with_queue_arn(self): 'event_uuid': 'my-uuid', 'batch_size': 10, 'maximum_batching_window_in_seconds': 0, + 'maximum_concurrency': None, }, ) self.assert_recorded_values( @@ -1925,6 +1928,80 @@ def test_sqs_event_source_exists_updates_batch_size(self): 'event_uuid': 'my-uuid', 'batch_size': 10, 'maximum_batching_window_in_seconds': 0, + 'maximum_concurrency': None, + }, + ) + self.assert_recorded_values( + plan, 'sqs_event', 'function_name-sqs-event-source', { + 'queue_arn': 'arn:sqs:myqueue', + 'event_uuid': 'my-uuid', + 'queue': 'myqueue', + 'lambda_arn': 'arn:lambda' + } + ) + + def test_sqs_event_supports_maximum_concurrency(self): + function = create_function_resource('function_name') + sqs_event_source = models.SQSEventSource( + resource_name='function_name-sqs-event-source', + queue=models.QueueARN(arn='arn:us-west-2:myqueue'), + batch_size=10, + lambda_function=function, + maximum_batching_window_in_seconds=0, + maximum_concurrency=2 + ) + plan = self.determine_plan(sqs_event_source) + assert plan[1] == models.APICall( + method_name='create_lambda_event_source', + params={ + 'event_source_arn': Variable( + "function_name-sqs-event-source_queue_arn" + ), + 'batch_size': 10, + 'maximum_batching_window_in_seconds': 0, + 'function_name': Variable("function_name_lambda_arn"), + 'maximum_concurrency': 2, + }, + output_var='function_name-sqs-event-source_uuid' + ) + self.assert_recorded_values( + plan, 'sqs_event', 'function_name-sqs-event-source', { + 'queue_arn': Variable( + 'function_name-sqs-event-source_queue_arn'), + 'event_uuid': Variable( + 'function_name-sqs-event-source_uuid'), + 'queue': 'myqueue', + 'lambda_arn': Variable( + 'function_name_lambda_arn') + } + ) + + def test_sqs_event_source_exists_updates_maximum_concurrency(self): + function = create_function_resource('function_name') + sqs_event_source = models.SQSEventSource( + resource_name='function_name-sqs-event-source', + queue='myqueue', + batch_size=10, + lambda_function=function, + maximum_batching_window_in_seconds=0, + maximum_concurrency=2 + ) + self.remote_state.declare_resource_exists( + sqs_event_source, + queue='myqueue', + queue_arn='arn:sqs:myqueue', + resource_type='sqs_event', + lambda_arn='arn:lambda', + event_uuid='my-uuid', + ) + plan = self.determine_plan(sqs_event_source) + assert plan[5] == models.APICall( + method_name='update_lambda_event_source', + params={ + 'event_uuid': 'my-uuid', + 'batch_size': 10, + 'maximum_batching_window_in_seconds': 0, + 'maximum_concurrency': 2, }, ) self.assert_recorded_values( diff --git a/tests/unit/test_package.py b/tests/unit/test_package.py index d21fb032b..aed1698ee 100644 --- a/tests/unit/test_package.py +++ b/tests/unit/test_package.py @@ -746,6 +746,34 @@ def handler(event): 'maximum_batching_window_in_seconds': 0 } + def test_can_package_sqs_handler_with_max_concurrency(self, sample_app): + @sample_app.on_sqs_message( + queue='foo', + batch_size=5, + maximum_concurrency=2 + ) + def handler(event): + pass + + config = Config.create(chalice_app=sample_app, + project_dir='.', + app_name='sample_app', + api_gateway_stage='api') + template = self.generate_template(config) + + assert template['resource'][ + 'aws_lambda_event_source_mapping'][ + 'handler-sqs-event-source'] == { + 'event_source_arn': ( + 'arn:${data.aws_partition.chalice.partition}:sqs' + ':${data.aws_region.chalice.name}:' + '${data.aws_caller_identity.chalice.account_id}:foo'), + 'function_name': '${aws_lambda_function.handler.arn}', + 'batch_size': 5, + 'maximum_batching_window_in_seconds': 0, + 'scaling_config': {'maximum_concurrency': 2} + } + def test_sqs_arn_does_not_use_fn_sub(self, sample_app): @sample_app.on_sqs_message(queue_arn='arn:foo:bar', batch_size=5) def handler(event): @@ -1736,6 +1764,37 @@ def handler(event): } } + def test_can_package_sqs_handler_with_max_concurrency(self, sample_app): + @sample_app.on_sqs_message( + queue='foo', + batch_size=5, + maximum_concurrency=2 + ) + def handler(event): + pass + + config = Config.create(chalice_app=sample_app, + project_dir='.', + api_gateway_stage='api') + template = self.generate_template(config) + sns_handler = template['Resources']['Handler'] + assert sns_handler['Properties']['Events'] == { + 'HandlerSqsEventSource': { + 'Type': 'SQS', + 'Properties': { + 'Queue': { + 'Fn::Sub': ( + 'arn:${AWS::Partition}:sqs:${AWS::Region}' + ':${AWS::AccountId}:foo' + ) + }, + 'BatchSize': 5, + 'MaximumBatchingWindowInSeconds': 0, + 'ScalingConfig': {'MaximumConcurrency': 2} + }, + } + } + def test_sqs_arn_does_not_use_fn_sub(self, sample_app): @sample_app.on_sqs_message(queue_arn='arn:foo:bar', batch_size=5) def handler(event): From 8303c243ebe41ad743d4e874ccfc9ff960eee782 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Mon, 3 Jun 2024 12:07:41 -0400 Subject: [PATCH 2/2] Add changelog entry for #2104 --- .../next-release/104765404384083-enhancement-SQS-98964.json | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changes/next-release/104765404384083-enhancement-SQS-98964.json diff --git a/.changes/next-release/104765404384083-enhancement-SQS-98964.json b/.changes/next-release/104765404384083-enhancement-SQS-98964.json new file mode 100644 index 000000000..af6ef892c --- /dev/null +++ b/.changes/next-release/104765404384083-enhancement-SQS-98964.json @@ -0,0 +1,5 @@ +{ + "type": "enhancement", + "category": "SQS", + "description": "Add configuration option for MaximumConcurrency for SQS event source (#2104)" +}