Skip to content

Commit

Permalink
Cache broadcast's audio file (#1771)
Browse files Browse the repository at this point in the history
  • Loading branch information
samnang authored Mar 2, 2025
1 parent 2e8e6b0 commit 0be35b6
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 14 deletions.
24 changes: 22 additions & 2 deletions app/models/broadcast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Broadcast < ApplicationRecord

module ActiveStorageDirty
attr_reader :audio_file_blob_was, :audio_file_will_change
attr_accessor :cache_audio_file_from_audio_url

def audio_file=(attachable)
@audio_file_blob_was = audio_file.blob if audio_file.attached?
Expand Down Expand Up @@ -74,16 +75,25 @@ def audio_file_blob_changed?

aasm column: :status, whiny_transitions: false do
state :pending, initial: true
state :errored
state :queued
state :running
state :stopped
state :completed

event :error do
transitions(
from: [ :pending, :queued ],
to: :errored
)
end

# TODO: Remove state transition from pending after we removed the old API
event :start do
transitions(
from: [ :pending, :queued ],
to: :running
from: [ :pending, :queued, :errored ],
to: :running,
before_transaction: -> { self.error_message = nil }
)
end

Expand Down Expand Up @@ -132,6 +142,15 @@ def updatable?
status == "pending"
end

def mark_as_errored!(message)
self.error_message = message
self.error!
end

def not_yet_started?
pending? || queued? || errored?
end

private

def set_call_flow_logic
Expand All @@ -143,6 +162,7 @@ def set_call_flow_logic
def process_audio_file
return unless audio_file.attached?
return unless audio_file_blob_changed?
return if cache_audio_file_from_audio_url

AudioFileProcessorJob.perform_later(self)
end
Expand Down
18 changes: 16 additions & 2 deletions app/request_schemas/v1/update_broadcast_request_schema.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module V1
class UpdateBroadcastRequestSchema < JSONAPIRequestSchema
STATES = Broadcast.aasm.states.map { _1.name.to_s } - [ "queued" ]
STATES = Broadcast.aasm.states.map { _1.name.to_s } - [ "queued", "errored" ]

params do
required(:data).value(:hash).schema do
Expand All @@ -18,6 +18,20 @@ class UpdateBroadcastRequestSchema < JSONAPIRequestSchema
attribute_rule(:beneficiary_filter).validate(contract: BeneficiaryFilter)
attribute_rule(:audio_url).validate(:url_format)

attribute_rule(:beneficiary_filter) do
next unless key?
next if resource.not_yet_started?

key.failure("does not allow to update after broadcast started")
end

attribute_rule(:audio_url) do
next unless key?
next if resource.not_yet_started?

key.failure("does not allow to update after broadcast started")
end

attribute_rule(:status) do
next unless key?

Expand All @@ -32,7 +46,7 @@ class UpdateBroadcastRequestSchema < JSONAPIRequestSchema
def output
result = super

if result[:status] == "running" && resource.pending?
if result[:status] == "running" && (resource.pending? || resource.errored?)
result[:status] = "queued"
end

Expand Down
18 changes: 18 additions & 0 deletions app/workflows/download_audio_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class DownloadAudioFile < ApplicationWorkflow
attr_reader :broadcast

def initialize(broadcast)
@broadcast = broadcast
end

def call
uri = URI.parse(broadcast.audio_url)
broadcast.cache_audio_file_from_audio_url = true
broadcast.audio_file.attach(
io: URI.open(uri),
filename: File.basename(uri)
)
rescue OpenURI::HTTPError, URI::InvalidURIError
broadcast.mark_as_errored!("Unable to download audio file")
end
end
17 changes: 16 additions & 1 deletion app/workflows/populate_alerts.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class PopulateAlerts < ApplicationWorkflow
attr_reader :broadcast

class BroadcastStartedError < StandardError; end

delegate :account, :beneficiary_filter, to: :broadcast, private: true

def initialize(broadcast)
Expand All @@ -9,17 +11,30 @@ def initialize(broadcast)

def call
ApplicationRecord.transaction do
download_audio_file unless broadcast.audio_file.attached?
return if broadcast.errored?

create_alerts
create_delivery_attempts

broadcast.error_message = nil
broadcast.start!
end
rescue BroadcastStartedError => e
broadcast.mark_as_errored!(e.message)
end

private

def download_audio_file
DownloadAudioFile.call(broadcast)
end

def create_alerts
alerts = beneficiaries_scope.find_each.map do |beneficiary|
beneficiaries = beneficiaries_scope
raise BroadcastStartedError, "No beneficiaries match the filters" if beneficiaries.none?

alerts = beneficiaries.find_each.map do |beneficiary|
{
broadcast_id: broadcast.id,
beneficiary_id: beneficiary.id,
Expand Down
5 changes: 5 additions & 0 deletions db/migrate/20250228073428_add_error_message_to_broadcasts.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddErrorMessageToBroadcasts < ActiveRecord::Migration[8.0]
def change
add_column :broadcasts, :error_message, :string
end
end
3 changes: 2 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[8.0].define(version: 2025_02_21_100342) do
ActiveRecord::Schema[8.0].define(version: 2025_02_28_073428) do
# These are extensions that must be enabled in order to support this database
enable_extension "citext"
enable_extension "pg_catalog.plpgsql"
Expand Down Expand Up @@ -148,6 +148,7 @@
t.bigint "created_by_id"
t.string "channel", null: false
t.jsonb "beneficiary_filter", default: {}, null: false
t.string "error_message"
t.index ["account_id"], name: "index_broadcasts_on_account_id"
t.index ["created_by_id"], name: "index_broadcasts_on_created_by_id"
t.index ["status"], name: "index_broadcasts_on_status"
Expand Down
22 changes: 22 additions & 0 deletions spec/models/broadcast_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@

expect(broadcast.audio_file_blob_changed?).to eq(false)
end

it "enqueues a job to process the audio file" do
broadcast = create(:broadcast)

broadcast.audio_file = fixture_file_upload("test.mp3", "audio/mp3")
result = broadcast.save

expect(result).to eq(true)
expect(AudioFileProcessorJob).to have_been_enqueued.with(broadcast)
end
end

describe "state_machine" do
Expand Down Expand Up @@ -112,4 +122,16 @@ def assert_transitions!
it { assert_transitions! }
end
end

describe "#not_yet_started?" do
it "returns true if the broadcast is pending, queued, or errored" do
expect(build_stubbed(:broadcast, status: :pending).not_yet_started?).to be(true)
expect(build_stubbed(:broadcast, status: :queued).not_yet_started?).to be(true)
expect(build_stubbed(:broadcast, status: :errored).not_yet_started?).to be(true)

expect(build_stubbed(:broadcast, status: :running).not_yet_started?).to be(false)
expect(build_stubbed(:broadcast, status: :stopped).not_yet_started?).to be(false)
expect(build_stubbed(:broadcast, status: :completed).not_yet_started?).to be(false)
end
end
end
66 changes: 62 additions & 4 deletions spec/request_schemas/v1/update_broadcast_request_schema_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
module V1
RSpec.describe UpdateBroadcastRequestSchema, type: :request_schema do
it "validates the audio_url" do
broadcast = create(:broadcast)
broadcast = create(:broadcast, status: :pending)
started_broadcast = create(:broadcast, status: :running)

expect(
validate_schema(input_params: { data: { attributes: {} } }, options: { resource: broadcast })
Expand All @@ -16,9 +17,31 @@ module V1
expect(
validate_schema(input_params: { data: { attributes: { audio_url: "http://example.com/sample.mp3" } } }, options: { resource: broadcast })
).to have_valid_field(:data, :attributes, :audio_url)

expect(
validate_schema(input_params: { data: { attributes: { audio_url: "http://example.com/sample.mp3" } } }, options: { resource: started_broadcast })
).not_to have_valid_field(:data, :attributes, :audio_url)
end

it "validates the beneficiary_filter" do
broadcast = create(:broadcast, status: :pending)
started_broadcast = create(:broadcast, status: :running)

expect(
validate_schema(input_params: { data: { attributes: {} } }, options: { resource: broadcast })
).to have_valid_field(:data, :attributes, :beneficiary_filter)

expect(
validate_schema(input_params: { data: { attributes: { beneficiary_filter: { status: { eq: "active" } } } } }, options: { resource: broadcast })
).to have_valid_field(:data, :attributes, :beneficiary_filter)

expect(
validate_schema(input_params: { data: { attributes: { beneficiary_filter: { status: { eq: "active" } } } } }, options: { resource: started_broadcast })
).not_to have_valid_field(:data, :attributes, :beneficiary_filter)
end

it "validates the status" do
errored_broadcast = create(:broadcast, status: :errored)
pending_broadcast = create(:broadcast, status: :pending)
running_broadcast = create(:broadcast, status: :running)
stopped_broadcast = create(:broadcast, status: :stopped)
Expand Down Expand Up @@ -67,22 +90,57 @@ module V1
expect(
validate_schema(input_params: { data: { attributes: { status: "queued" } } }, options: { resource: pending_broadcast })
).not_to have_valid_field(:data, :attributes, :status)

expect(
validate_schema(input_params: { data: { attributes: { status: "running" } } }, options: { resource: errored_broadcast })
).to have_valid_field(:data, :attributes, :status)

expect(
validate_schema(input_params: { data: { attributes: { status: "stopped" } } }, options: { resource: errored_broadcast })
).not_to have_valid_field(:data, :attributes, :status)

expect(
validate_schema(input_params: { data: { attributes: { status: "queued" } } }, options: { resource: errored_broadcast })
).not_to have_valid_field(:data, :attributes, :status)

expect(
validate_schema(input_params: { data: { attributes: { status: "completed" } } }, options: { resource: errored_broadcast })
).not_to have_valid_field(:data, :attributes, :status)
end

it "handles post processing" do
broadcast = create(:broadcast, status: :pending)
pending_broadcast = create(:broadcast, status: :pending)
errored_broadcast = create(:broadcast, status: :errored)

result = validate_schema(
input_params: {
data: {
id: pending_broadcast.id,
attributes: {
status: "running",
audio_url: "http://example.com/sample.mp3"
}
}
},
options: { resource: pending_broadcast }
).output

expect(result).to include(
status: "queued",
audio_url: "http://example.com/sample.mp3"
)

result = validate_schema(
input_params: {
data: {
id: broadcast.id,
id: errored_broadcast.id,
attributes: {
status: "running",
audio_url: "http://example.com/sample.mp3"
}
}
},
options: { resource: broadcast }
options: { resource: errored_broadcast }
).output

expect(result).to include(
Expand Down
11 changes: 7 additions & 4 deletions spec/requests/open_ews_api/v1/broadcasts_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@
}
)

stub_request(:get, "https://www.example.com/test.mp3")
.to_return(status: 200, body: file_fixture("test.mp3"))

set_authorization_header_for(account)
perform_enqueued_jobs do
do_request(
Expand All @@ -110,7 +113,7 @@
type: :broadcast,
attributes: {
status: "running",
audio_url: "https://www.example.com/sample.mp3",
audio_url: "https://www.example.com/test.mp3",
beneficiary_filter: {
gender: { eq: "F" }
}
Expand All @@ -123,12 +126,13 @@
expect(response_body).to match_jsonapi_resource_schema("broadcast")
expect(json_response.dig("data", "attributes")).to include(
"status" => "queued",
"audio_url" => "https://www.example.com/sample.mp3",
"audio_url" => "https://www.example.com/test.mp3",
"beneficiary_filter" => {
"gender" => { "eq" => "F" }
}
)
expect(broadcast.reload.status).to eq("running")
expect(broadcast.audio_file).to be_attached
expect(broadcast.beneficiaries).to match_array([ female_beneficiary ])
expect(broadcast.delivery_attempts.count).to eq(1)
expect(broadcast.delivery_attempts.first.beneficiary).to eq(female_beneficiary)
Expand All @@ -149,8 +153,7 @@
id: broadcast.id,
type: :broadcast,
attributes: {
status: "pending",
audio_url: "https://www.example.com/sample.mp3"
status: "pending"
}
}
)
Expand Down
Loading

0 comments on commit 0be35b6

Please sign in to comment.