From 2bd1924794cc48c45b7e227b71d9335ba6adf7a0 Mon Sep 17 00:00:00 2001 From: Satoshi Matsumoto Date: Fri, 6 Jul 2018 18:07:20 +0100 Subject: [PATCH 1/3] Support cross-region SNS topic subscription --- .../barbeque/sns_subscriptions_controller.rb | 10 +++++++++- app/models/barbeque/sns_subscription.rb | 4 ++++ app/services/barbeque/sns_subscription_service.rb | 11 +++-------- lib/barbeque/config.rb | 3 ++- .../barbeque/job_definitions_controller_spec.rb | 2 +- .../barbeque/sns_subscriptions_controller_spec.rb | 2 +- spec/factories/sns_subscription.rb | 2 +- 7 files changed, 21 insertions(+), 13 deletions(-) diff --git a/app/controllers/barbeque/sns_subscriptions_controller.rb b/app/controllers/barbeque/sns_subscriptions_controller.rb index b886c30..d97ac3e 100644 --- a/app/controllers/barbeque/sns_subscriptions_controller.rb +++ b/app/controllers/barbeque/sns_subscriptions_controller.rb @@ -1,3 +1,5 @@ +require 'aws-sdk-sns' + class Barbeque::SnsSubscriptionsController < Barbeque::ApplicationController def index @sns_subscriptions = Barbeque::SNSSubscription.all @@ -44,6 +46,12 @@ def destroy private def fetch_sns_topic_arns - Barbeque::SNSSubscriptionService.sns_client.list_topics.flat_map(&:topics).map(&:topic_arn) + if Barbeque.config.sns_regions.empty? + Aws::SNS::Client.new.list_topics.flat_map(&:topics).map(&:topic_arn) + else + Barbeque.config.sns_regions.flat_map do |region| + Aws::SNS::Client.new(region: region).list_topics.flat_map(&:topics).map(&:topic_arn) + end + end end end diff --git a/app/models/barbeque/sns_subscription.rb b/app/models/barbeque/sns_subscription.rb index 7c004fb..81e11fc 100644 --- a/app/models/barbeque/sns_subscription.rb +++ b/app/models/barbeque/sns_subscription.rb @@ -7,5 +7,9 @@ class SNSSubscription < ApplicationRecord validates :topic_arn, uniqueness: { scope: :job_queue, message: 'should be set with only one queue' }, presence: true + + def region + /\Aarn:aws:sns:([a-z0-9-]+):/.match(topic_arn)[1] + end end end diff --git a/app/services/barbeque/sns_subscription_service.rb b/app/services/barbeque/sns_subscription_service.rb index 9f06dd0..e2d6f18 100644 --- a/app/services/barbeque/sns_subscription_service.rb +++ b/app/services/barbeque/sns_subscription_service.rb @@ -6,10 +6,6 @@ def self.sqs_client @sqs_client ||= Aws::SQS::Client.new end - def self.sns_client - @sns_client ||= Aws::SNS::Client.new - end - # @param [Barbeque::SNSSubscription] sns_subscription # @return [Boolean] `true` if succeeded to subscribe def subscribe(sns_subscription) @@ -45,10 +41,6 @@ def sqs_client Barbeque::SNSSubscriptionService.sqs_client end - def sns_client - Barbeque::SNSSubscriptionService.sns_client - end - # @param [Barbeque::SNSSubscription] sns_subscription def update_sqs_policy!(sns_subscription) attrs = sqs_client.get_queue_attributes( @@ -98,6 +90,7 @@ def subscribe_topic!(sns_subscription) ) queue_arn = sqs_attrs.attributes['QueueArn'] + sns_client = Aws::SNS::Client.new(region: sns_subscription.region) sns_client.subscribe( topic_arn: sns_subscription.topic_arn, protocol: 'sqs', @@ -113,6 +106,8 @@ def unsubscribe_topic!(sns_subscription) ) queue_arn = sqs_attrs.attributes['QueueArn'] + sns_client = Aws::SNS::Client.new(region: sns_subscription.region) + subscriptions = sns_client.list_subscriptions_by_topic( topic_arn: sns_subscription.topic_arn, ) diff --git a/lib/barbeque/config.rb b/lib/barbeque/config.rb index f8ab373..e10a9b9 100644 --- a/lib/barbeque/config.rb +++ b/lib/barbeque/config.rb @@ -3,7 +3,7 @@ module Barbeque class Config - attr_accessor :exception_handler, :executor, :executor_options, :sqs_receive_message_wait_time, :maximum_concurrent_executions, :runner_wait_seconds + attr_accessor :exception_handler, :executor, :executor_options, :sqs_receive_message_wait_time, :maximum_concurrent_executions, :runner_wait_seconds, :sns_regions def initialize(options = {}) options.each do |key, value| @@ -27,6 +27,7 @@ module ConfigBuilder # nil means unlimited 'maximum_concurrent_executions' => nil, 'runner_wait_seconds' => 10, + 'sns_regions' => [], } def config diff --git a/spec/controllers/barbeque/job_definitions_controller_spec.rb b/spec/controllers/barbeque/job_definitions_controller_spec.rb index 342ec28..9465b92 100644 --- a/spec/controllers/barbeque/job_definitions_controller_spec.rb +++ b/spec/controllers/barbeque/job_definitions_controller_spec.rb @@ -191,8 +191,8 @@ let(:subscription_arn) { 'arn:aws:sns:ap-northeast-1:012345678912:barbeque-spec:01234567-89ab-cdef-0123-456789abcdef' } before do - allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).and_return(sns_client) allow(Barbeque::SNSSubscriptionService).to receive(:sqs_client).and_return(sqs_client) + allow(Aws::SNS::Client).to receive(:new).with(region: 'ap-northeast-1').and_return(sns_client) allow(sqs_client).to receive(:get_queue_attributes). with(queue_url: sns_subscription.job_queue.queue_url, attribute_names: ['QueueArn']). diff --git a/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb b/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb index a4e9716..b5409a3 100644 --- a/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb +++ b/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb @@ -5,7 +5,7 @@ before do allow(Barbeque::SNSSubscriptionService).to receive(:sqs_client).and_return(sqs_client) - allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).and_return(sns_client) + allow(Aws::SNS::Client).to receive(:new).with(region: 'ap-northeast-1').and_return(sns_client) end describe '#create' do diff --git a/spec/factories/sns_subscription.rb b/spec/factories/sns_subscription.rb index 2d11706..45bbac5 100644 --- a/spec/factories/sns_subscription.rb +++ b/spec/factories/sns_subscription.rb @@ -1,6 +1,6 @@ FactoryBot.define do factory :sns_subscription, class: Barbeque::SNSSubscription do - sequence(:topic_arn) { |n| "arn:aws:sns:ap-northest-1:123456789012/Topic-#{n}" } + sequence(:topic_arn) { |n| "arn:aws:sns:ap-northeast-1:123456789012/Topic-#{n}" } job_queue job_definition end From cdd594df560704ebb55ba96312f98324c3844e28 Mon Sep 17 00:00:00 2001 From: Satoshi Matsumoto Date: Wed, 11 Jul 2018 06:32:58 +0100 Subject: [PATCH 2/3] Use #slice instead of #match --- app/models/barbeque/sns_subscription.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/barbeque/sns_subscription.rb b/app/models/barbeque/sns_subscription.rb index 81e11fc..9b0f635 100644 --- a/app/models/barbeque/sns_subscription.rb +++ b/app/models/barbeque/sns_subscription.rb @@ -9,7 +9,7 @@ class SNSSubscription < ApplicationRecord presence: true def region - /\Aarn:aws:sns:([a-z0-9-]+):/.match(topic_arn)[1] + topic_arn.slice(/\Aarn:aws:sns:([a-z0-9-]+):/, 1) end end end From f581ea66ac594d089ce1dd1a6908eb0c1d913416 Mon Sep 17 00:00:00 2001 From: Satoshi Matsumoto Date: Wed, 11 Jul 2018 07:12:02 +0100 Subject: [PATCH 3/3] Memoize Aws::SNS::Clinet.new --- .../barbeque/sns_subscriptions_controller.rb | 6 ++---- .../barbeque/sns_subscription_service.rb | 20 ++++++++++++------- .../job_definitions_controller_spec.rb | 2 +- .../sns_subscriptions_controller_spec.rb | 2 +- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/app/controllers/barbeque/sns_subscriptions_controller.rb b/app/controllers/barbeque/sns_subscriptions_controller.rb index d97ac3e..2abf5e5 100644 --- a/app/controllers/barbeque/sns_subscriptions_controller.rb +++ b/app/controllers/barbeque/sns_subscriptions_controller.rb @@ -1,5 +1,3 @@ -require 'aws-sdk-sns' - class Barbeque::SnsSubscriptionsController < Barbeque::ApplicationController def index @sns_subscriptions = Barbeque::SNSSubscription.all @@ -47,10 +45,10 @@ def destroy def fetch_sns_topic_arns if Barbeque.config.sns_regions.empty? - Aws::SNS::Client.new.list_topics.flat_map(&:topics).map(&:topic_arn) + Barbeque::SNSSubscriptionService.sns_client.list_topics.flat_map(&:topics).map(&:topic_arn) else Barbeque.config.sns_regions.flat_map do |region| - Aws::SNS::Client.new(region: region).list_topics.flat_map(&:topics).map(&:topic_arn) + Barbeque::SNSSubscriptionService.sns_client(region).list_topics.flat_map(&:topics).map(&:topic_arn) end end end diff --git a/app/services/barbeque/sns_subscription_service.rb b/app/services/barbeque/sns_subscription_service.rb index e2d6f18..dd3d9d9 100644 --- a/app/services/barbeque/sns_subscription_service.rb +++ b/app/services/barbeque/sns_subscription_service.rb @@ -6,6 +6,11 @@ def self.sqs_client @sqs_client ||= Aws::SQS::Client.new end + def self.sns_client(region = nil) + @sns_clients ||= {} + @sns_clients[region] ||= Aws::SNS::Client.new(region: region) + end + # @param [Barbeque::SNSSubscription] sns_subscription # @return [Boolean] `true` if succeeded to subscribe def subscribe(sns_subscription) @@ -38,7 +43,11 @@ def unsubscribe(sns_subscription) private def sqs_client - Barbeque::SNSSubscriptionService.sqs_client + self.class.sqs_client + end + + def sns_client(region) + self.class.sns_client(region) end # @param [Barbeque::SNSSubscription] sns_subscription @@ -90,8 +99,7 @@ def subscribe_topic!(sns_subscription) ) queue_arn = sqs_attrs.attributes['QueueArn'] - sns_client = Aws::SNS::Client.new(region: sns_subscription.region) - sns_client.subscribe( + sns_client(sns_subscription.region).subscribe( topic_arn: sns_subscription.topic_arn, protocol: 'sqs', endpoint: queue_arn @@ -106,15 +114,13 @@ def unsubscribe_topic!(sns_subscription) ) queue_arn = sqs_attrs.attributes['QueueArn'] - sns_client = Aws::SNS::Client.new(region: sns_subscription.region) - - subscriptions = sns_client.list_subscriptions_by_topic( + subscriptions = sns_client(sns_subscription.region).list_subscriptions_by_topic( topic_arn: sns_subscription.topic_arn, ) subscription_arn = subscriptions.subscriptions.find {|subscription| subscription.endpoint == queue_arn }.try!(:subscription_arn) if subscription_arn - sns_client.unsubscribe( + sns_client(sns_subscription.region).unsubscribe( subscription_arn: subscription_arn, ) end diff --git a/spec/controllers/barbeque/job_definitions_controller_spec.rb b/spec/controllers/barbeque/job_definitions_controller_spec.rb index 9465b92..d5a56d8 100644 --- a/spec/controllers/barbeque/job_definitions_controller_spec.rb +++ b/spec/controllers/barbeque/job_definitions_controller_spec.rb @@ -191,8 +191,8 @@ let(:subscription_arn) { 'arn:aws:sns:ap-northeast-1:012345678912:barbeque-spec:01234567-89ab-cdef-0123-456789abcdef' } before do + allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).with('ap-northeast-1').and_return(sns_client) allow(Barbeque::SNSSubscriptionService).to receive(:sqs_client).and_return(sqs_client) - allow(Aws::SNS::Client).to receive(:new).with(region: 'ap-northeast-1').and_return(sns_client) allow(sqs_client).to receive(:get_queue_attributes). with(queue_url: sns_subscription.job_queue.queue_url, attribute_names: ['QueueArn']). diff --git a/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb b/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb index b5409a3..43c2e70 100644 --- a/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb +++ b/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb @@ -5,7 +5,7 @@ before do allow(Barbeque::SNSSubscriptionService).to receive(:sqs_client).and_return(sqs_client) - allow(Aws::SNS::Client).to receive(:new).with(region: 'ap-northeast-1').and_return(sns_client) + allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).with('ap-northeast-1').and_return(sns_client) end describe '#create' do