diff --git a/app/controllers/barbeque/sns_subscriptions_controller.rb b/app/controllers/barbeque/sns_subscriptions_controller.rb index b886c30..2abf5e5 100644 --- a/app/controllers/barbeque/sns_subscriptions_controller.rb +++ b/app/controllers/barbeque/sns_subscriptions_controller.rb @@ -44,6 +44,12 @@ def destroy private def fetch_sns_topic_arns - Barbeque::SNSSubscriptionService.sns_client.list_topics.flat_map(&:topics).map(&:topic_arn) + if Barbeque.config.sns_regions.empty? + Barbeque::SNSSubscriptionService.sns_client.list_topics.flat_map(&:topics).map(&:topic_arn) + else + Barbeque.config.sns_regions.flat_map do |region| + Barbeque::SNSSubscriptionService.sns_client(region).list_topics.flat_map(&:topics).map(&:topic_arn) + end + end end end diff --git a/app/models/barbeque/sns_subscription.rb b/app/models/barbeque/sns_subscription.rb index 7c004fb..9b0f635 100644 --- a/app/models/barbeque/sns_subscription.rb +++ b/app/models/barbeque/sns_subscription.rb @@ -7,5 +7,9 @@ class SNSSubscription < ApplicationRecord validates :topic_arn, uniqueness: { scope: :job_queue, message: 'should be set with only one queue' }, presence: true + + def region + topic_arn.slice(/\Aarn:aws:sns:([a-z0-9-]+):/, 1) + end end end diff --git a/app/services/barbeque/sns_subscription_service.rb b/app/services/barbeque/sns_subscription_service.rb index 9f06dd0..dd3d9d9 100644 --- a/app/services/barbeque/sns_subscription_service.rb +++ b/app/services/barbeque/sns_subscription_service.rb @@ -6,8 +6,9 @@ def self.sqs_client @sqs_client ||= Aws::SQS::Client.new end - def self.sns_client - @sns_client ||= Aws::SNS::Client.new + def self.sns_client(region = nil) + @sns_clients ||= {} + @sns_clients[region] ||= Aws::SNS::Client.new(region: region) end # @param [Barbeque::SNSSubscription] sns_subscription @@ -42,11 +43,11 @@ def unsubscribe(sns_subscription) private def sqs_client - Barbeque::SNSSubscriptionService.sqs_client + self.class.sqs_client end - def sns_client - Barbeque::SNSSubscriptionService.sns_client + def sns_client(region) + self.class.sns_client(region) end # @param [Barbeque::SNSSubscription] sns_subscription @@ -98,7 +99,7 @@ def subscribe_topic!(sns_subscription) ) queue_arn = sqs_attrs.attributes['QueueArn'] - sns_client.subscribe( + sns_client(sns_subscription.region).subscribe( topic_arn: sns_subscription.topic_arn, protocol: 'sqs', endpoint: queue_arn @@ -113,13 +114,13 @@ def unsubscribe_topic!(sns_subscription) ) queue_arn = sqs_attrs.attributes['QueueArn'] - subscriptions = sns_client.list_subscriptions_by_topic( + subscriptions = sns_client(sns_subscription.region).list_subscriptions_by_topic( topic_arn: sns_subscription.topic_arn, ) subscription_arn = subscriptions.subscriptions.find {|subscription| subscription.endpoint == queue_arn }.try!(:subscription_arn) if subscription_arn - sns_client.unsubscribe( + sns_client(sns_subscription.region).unsubscribe( subscription_arn: subscription_arn, ) end diff --git a/lib/barbeque/config.rb b/lib/barbeque/config.rb index f8ab373..e10a9b9 100644 --- a/lib/barbeque/config.rb +++ b/lib/barbeque/config.rb @@ -3,7 +3,7 @@ module Barbeque class Config - attr_accessor :exception_handler, :executor, :executor_options, :sqs_receive_message_wait_time, :maximum_concurrent_executions, :runner_wait_seconds + attr_accessor :exception_handler, :executor, :executor_options, :sqs_receive_message_wait_time, :maximum_concurrent_executions, :runner_wait_seconds, :sns_regions def initialize(options = {}) options.each do |key, value| @@ -27,6 +27,7 @@ module ConfigBuilder # nil means unlimited 'maximum_concurrent_executions' => nil, 'runner_wait_seconds' => 10, + 'sns_regions' => [], } def config diff --git a/spec/controllers/barbeque/job_definitions_controller_spec.rb b/spec/controllers/barbeque/job_definitions_controller_spec.rb index 342ec28..d5a56d8 100644 --- a/spec/controllers/barbeque/job_definitions_controller_spec.rb +++ b/spec/controllers/barbeque/job_definitions_controller_spec.rb @@ -191,7 +191,7 @@ let(:subscription_arn) { 'arn:aws:sns:ap-northeast-1:012345678912:barbeque-spec:01234567-89ab-cdef-0123-456789abcdef' } before do - allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).and_return(sns_client) + allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).with('ap-northeast-1').and_return(sns_client) allow(Barbeque::SNSSubscriptionService).to receive(:sqs_client).and_return(sqs_client) allow(sqs_client).to receive(:get_queue_attributes). diff --git a/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb b/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb index a4e9716..43c2e70 100644 --- a/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb +++ b/spec/controllers/barbeque/sns_subscriptions_controller_spec.rb @@ -5,7 +5,7 @@ before do allow(Barbeque::SNSSubscriptionService).to receive(:sqs_client).and_return(sqs_client) - allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).and_return(sns_client) + allow(Barbeque::SNSSubscriptionService).to receive(:sns_client).with('ap-northeast-1').and_return(sns_client) end describe '#create' do diff --git a/spec/factories/sns_subscription.rb b/spec/factories/sns_subscription.rb index 2d11706..45bbac5 100644 --- a/spec/factories/sns_subscription.rb +++ b/spec/factories/sns_subscription.rb @@ -1,6 +1,6 @@ FactoryBot.define do factory :sns_subscription, class: Barbeque::SNSSubscription do - sequence(:topic_arn) { |n| "arn:aws:sns:ap-northest-1:123456789012/Topic-#{n}" } + sequence(:topic_arn) { |n| "arn:aws:sns:ap-northeast-1:123456789012/Topic-#{n}" } job_queue job_definition end