From 23b165e01ae885903dfc23d6ac4e9243fa026f45 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 15 Apr 2024 19:03:51 -0600 Subject: [PATCH 01/22] Initial draft of fetching all replies on context load --- app/controllers/api/v1/statuses_controller.rb | 11 +++++++++ .../activitypub/fetch_replies_service.rb | 23 ++++++++++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 2593ef7da582b3..0e8d5cdc829432 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -47,6 +47,17 @@ def context ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT + else + collection = @status['replies'] + + unless collection.nil? && @status.local? + ActivityPub::FetchRepliesService.new.call( + value_or_id(@status), + value_or_id(collection), + allow_synchronous_requests: true, + all_replies: true + ) + end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 46cab6caf93dc3..f2fa1c2596fa74 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -3,7 +3,14 @@ class ActivityPub::FetchRepliesService < BaseService include JsonLdHelper - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil) + # Limit of fetched replies used when not fetching all replies + MAX_REPLIES_LOW = 5 + + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) + # Whether we are getting replies from more than the originating server, + # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` + @all_replies = all_replies + @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests @@ -35,7 +42,7 @@ def collection_items(collection_or_uri) def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if !@all_replies && non_matching_uri_hosts?(@account.uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -54,10 +61,14 @@ def fetch_collection(collection_or_uri) end def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. + if @all_replies + @items.map { |item| value_or_id(item) } + else + # Only fetch replies to the same server as the original status to avoid + # amplification attacks. - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5) + # Also limit to 5 fetched replies to limit potential for DoS. + @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) + end end end From e53938f91186fe6c467856303af63d2a4c422e00 Mon Sep 17 00:00:00 2001 From: jonny Date: Wed, 8 May 2024 01:55:55 -0700 Subject: [PATCH 02/22] committing all ugly with a bunch of logger calls in the middle but we are almost there baby --- app/controllers/api/v1/statuses_controller.rb | 28 +++++++++++++------ .../activitypub/fetch_replies_service.rb | 11 ++++++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 0e8d5cdc829432..37bdcba36a11d2 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -2,6 +2,7 @@ class Api::V1::StatusesController < Api::BaseController include Authorization + include JsonLdHelper before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy] before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy] @@ -48,15 +49,24 @@ def context descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else - collection = @status['replies'] - - unless collection.nil? && @status.local? - ActivityPub::FetchRepliesService.new.call( - value_or_id(@status), - value_or_id(collection), - allow_synchronous_requests: true, - all_replies: true - ) + unless @status.local? + json_status = fetch_resource(@status.uri, true, @current_account) + + logger.warn "json status" + logger.warn json_status + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] + logger.warn "replies uri" + logger.warn collection + + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index f2fa1c2596fa74..c516fe759d8a8f 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -15,6 +15,9 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @allow_synchronous_requests = allow_synchronous_requests @items = collection_items(collection_or_uri) + logger = Logger.new(STDOUT) + logger.warn 'collection items' + logger.warn @items return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } @@ -25,12 +28,20 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req private def collection_items(collection_or_uri) + logger = Logger.new(STDOUT) collection = fetch_collection(collection_or_uri) + logger.warn 'first collection' + logger.warn collection return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? + logger.warn 'second collection' + logger.warn collection return unless collection.is_a?(Hash) + # Need to do another "next" here. see https://neuromatch.social/users/jonny/statuses/112401738180959195/replies for example + # then we are home free (stopping for tonight tho.) + case collection['type'] when 'Collection', 'CollectionPage' as_array(collection['items']) From 5bb473d771043068a15891578cacc82138142ba6 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 19:42:59 -0700 Subject: [PATCH 03/22] working (i think?) recursive fetch --- app/controllers/api/v1/statuses_controller.rb | 32 ++++++------- app/lib/activitypub/activity/create.rb | 2 +- app/models/status.rb | 10 ++++ .../activitypub/fetch_replies_service.rb | 46 ++++++++++++------- ...233930_add_fetched_replies_at_to_status.rb | 9 ++++ db/schema.rb | 1 + 6 files changed, 65 insertions(+), 35 deletions(-) create mode 100644 db/migrate/20240918233930_add_fetched_replies_at_to_status.rb diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 37bdcba36a11d2..462802f3cb362b 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -49,24 +49,20 @@ def context descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else - unless @status.local? - json_status = fetch_resource(@status.uri, true, @current_account) - - logger.warn "json status" - logger.warn json_status - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - logger.warn "replies uri" - logger.warn collection - - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end + unless @status.local? && !@status.should_fetch_replies? + json_status = fetch_resource(@status.uri, true, @current_account) + + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] + + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end end end diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index cbc5b309b426e3..14ceb2f662c2c5 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -315,7 +315,7 @@ def fetch_replies(status) collection = @object['replies'] return if collection.blank? - replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) + replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id], all_replies: status.should_fetch_replies?) return unless replies.nil? uri = value_or_id(collection) diff --git a/app/models/status.rb b/app/models/status.rb index 7bde0438a6a308..85384ff1682b60 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -29,6 +29,7 @@ # edited_at :datetime # trendable :boolean # ordered_media_attachment_ids :bigint(8) is an Array +# fetched_replies_at :datetime # class Status < ApplicationRecord @@ -183,6 +184,8 @@ class Status < ApplicationRecord delegate :domain, to: :account, prefix: true REAL_TIME_WINDOW = 6.hours + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = 1.hour def cache_key "v3:#{super}" @@ -440,6 +443,13 @@ def unlink_from_conversations! end end + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + created_at <= 10.minutes.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago + ) + end + private def update_status_stat!(attrs) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c516fe759d8a8f..d7a66456b1de73 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,22 +5,27 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies used when not fetching all replies MAX_REPLIES_LOW = 5 + # limit of fetched replies used when fetching all replies + MAX_REPLIES_HIGH = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies + # store the status and whether we should fetch replies for it to avoid + # race conditions if something else updates us in the meantime + @status = parent_status + @should_fetch_replies = parent_status.should_fetch_replies? @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests @items = collection_items(collection_or_uri) - logger = Logger.new(STDOUT) - logger.warn 'collection items' - logger.warn @items return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + # Store last fetched all to debounce + @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? @items end @@ -28,26 +33,30 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req private def collection_items(collection_or_uri) - logger = Logger.new(STDOUT) collection = fetch_collection(collection_or_uri) - logger.warn 'first collection' - logger.warn collection return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? - logger.warn 'second collection' - logger.warn collection return unless collection.is_a?(Hash) - # Need to do another "next" here. see https://neuromatch.social/users/jonny/statuses/112401738180959195/replies for example - # then we are home free (stopping for tonight tho.) + all_items = [] + while collection.is_a?(Hash) + items = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end - case collection['type'] - when 'Collection', 'CollectionPage' - as_array(collection['items']) - when 'OrderedCollection', 'OrderedCollectionPage' - as_array(collection['orderedItems']) + all_items.concat(as_array(items)) + + # Quit early if we are not fetching all replies + break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies? + + collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end + + all_items end def fetch_collection(collection_or_uri) @@ -73,7 +82,8 @@ def fetch_collection(collection_or_uri) def filtered_replies if @all_replies - @items.map { |item| value_or_id(item) } + # Reject all statuses that we already have in the db + @items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) } else # Only fetch replies to the same server as the original status to avoid # amplification attacks. @@ -82,4 +92,8 @@ def filtered_replies @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) end end + + def fetch_all_replies? + @all_replies && @should_fetch_replies + end end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb new file mode 100644 index 00000000000000..c42eff6aeb38fc --- /dev/null +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + add_column :statuses, :fetched_replies_at, :datetime, null: true + end +end diff --git a/db/schema.rb b/db/schema.rb index 6335f7f31ac210..ab0425129b5e3d 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -1052,6 +1052,7 @@ t.datetime "edited_at", precision: nil t.boolean "trendable" t.bigint "ordered_media_attachment_ids", array: true + t.datetime "fetched_replies_at" t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)" t.index ["account_id"], name: "index_statuses_on_account_id" t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)" From d7c0ce08a875b416dd1e5159d68b3fb4696c100e Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 20:35:36 -0700 Subject: [PATCH 04/22] don't do it for every create, only do recursive reply expansion when requested from context endpoint, but async --- app/controllers/api/v1/statuses_controller.rb | 2 +- app/lib/activitypub/activity/create.rb | 2 +- app/models/status.rb | 2 +- .../activitypub/fetch_replies_service.rb | 2 +- app/workers/fetch_reply_worker.rb | 23 ++++++++++++++++-- db/schema.rb | 24 +++++++++---------- 6 files changed, 37 insertions(+), 18 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 462802f3cb362b..c7569f4285dfea 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -50,7 +50,7 @@ def context descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else unless @status.local? && !@status.should_fetch_replies? - json_status = fetch_resource(@status.uri, true, @current_account) + json_status = fetch_resource(@status.uri, true, current_account) # rescue this whole block on failure, don't want to fail the whole context request if we can't do this collection = json_status['replies'] diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 14ceb2f662c2c5..cbc5b309b426e3 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -315,7 +315,7 @@ def fetch_replies(status) collection = @object['replies'] return if collection.blank? - replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id], all_replies: status.should_fetch_replies?) + replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) return unless replies.nil? uri = value_or_id(collection) diff --git a/app/models/status.rb b/app/models/status.rb index 85384ff1682b60..ee7bc8ab8541d5 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -445,7 +445,7 @@ def unlink_from_conversations! def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - created_at <= 10.minutes.ago && ( + !local? && created_at <= 10.minutes.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago ) end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index d7a66456b1de73..c1517837b25c6e 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -23,7 +23,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }, @all_replies] } # Store last fetched all to debounce @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 68a7414bebeaa0..af33e8d6813d55 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -6,7 +6,26 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_url, options = {}) - FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + def perform(child_url, options = {}, all_replies: false) + status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + + # asked to fetch replies recursively - do the second-level calls async + if all_replies && status.should_fetch_replies? + json_status = fetch_resource(status.uri, true) + + collection = json_status['replies'] + unless collection.nil? + # if expanding replies recursively, spread out the recursive calls + ActivityPub::FetchRepliesWorker.perform_in( + rand(1..30).seconds, + status.id, + collection, + { + allow_synchronous_requests: true, + all_replies: true, + } + ) + end + end end end diff --git a/db/schema.rb b/db/schema.rb index ab0425129b5e3d..2a0267bfc33e6b 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -553,12 +553,12 @@ end create_table "ip_blocks", force: :cascade do |t| + t.datetime "created_at", precision: nil, null: false + t.datetime "updated_at", precision: nil, null: false + t.datetime "expires_at", precision: nil t.inet "ip", default: "0.0.0.0", null: false t.integer "severity", default: 0, null: false - t.datetime "expires_at", precision: nil t.text "comment", default: "", null: false - t.datetime "created_at", precision: nil, null: false - t.datetime "updated_at", precision: nil, null: false t.index ["ip"], name: "index_ip_blocks_on_ip", unique: true end @@ -1381,9 +1381,9 @@ add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true create_view "user_ips", sql_definition: <<-SQL - SELECT user_id, - ip, - max(used_at) AS used_at + SELECT t0.user_id, + t0.ip, + max(t0.used_at) AS used_at FROM ( SELECT users.id AS user_id, users.sign_up_ip AS ip, users.created_at AS used_at @@ -1400,7 +1400,7 @@ login_activities.created_at FROM login_activities WHERE (login_activities.success = true)) t0 - GROUP BY user_id, ip; + GROUP BY t0.user_id, t0.ip; SQL create_view "account_summaries", materialized: true, sql_definition: <<-SQL SELECT accounts.id AS account_id, @@ -1421,9 +1421,9 @@ add_index "account_summaries", ["account_id"], name: "index_account_summaries_on_account_id", unique: true create_view "global_follow_recommendations", materialized: true, sql_definition: <<-SQL - SELECT account_id, - sum(rank) AS rank, - array_agg(reason) AS reason + SELECT t0.account_id, + sum(t0.rank) AS rank, + array_agg(t0.reason) AS reason FROM ( SELECT account_summaries.account_id, ((count(follows.id))::numeric / (1.0 + (count(follows.id))::numeric)) AS rank, 'most_followed'::text AS reason @@ -1447,8 +1447,8 @@ WHERE (follow_recommendation_suppressions.account_id = statuses.account_id))))) GROUP BY account_summaries.account_id HAVING (sum((status_stats.reblogs_count + status_stats.favourites_count)) >= (5)::numeric)) t0 - GROUP BY account_id - ORDER BY (sum(rank)) DESC; + GROUP BY t0.account_id + ORDER BY (sum(t0.rank)) DESC; SQL add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true From bf86d13f80d4137a63e5bbf4c7361b017648c6c8 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 21:07:28 -0700 Subject: [PATCH 05/22] correct number of args to replies worker, recursive fetching is working --- app/models/status.rb | 2 +- app/services/activitypub/fetch_replies_service.rb | 2 +- app/workers/fetch_reply_worker.rb | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/app/models/status.rb b/app/models/status.rb index ee7bc8ab8541d5..2018c91b769c4e 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -185,7 +185,7 @@ class Status < ApplicationRecord REAL_TIME_WINDOW = 6.hours # debounce fetching all replies to minimize DoS - FETCH_REPLIES_DEBOUNCE = 1.hour + FETCH_REPLIES_DEBOUNCE = 30.minutes def cache_key "v3:#{super}" diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c1517837b25c6e..18b3d2eddc1fe2 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -23,7 +23,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }, @all_replies] } + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } # Store last fetched all to debounce @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index af33e8d6813d55..280bc8406b4d84 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -3,14 +3,17 @@ class FetchReplyWorker include Sidekiq::Worker include ExponentialBackoff + include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 - def perform(child_url, options = {}, all_replies: false) + def perform(child_url, options = {}) + all_replies = options.delete('all_replies') + status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) # asked to fetch replies recursively - do the second-level calls async - if all_replies && status.should_fetch_replies? + if all_replies && status json_status = fetch_resource(status.uri, true) collection = json_status['replies'] From 2b16ac99b1062b7ab1a1c69f35056fdfc7ec568f Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 21:49:57 -0700 Subject: [PATCH 06/22] accept review comments https://github.com/NeuromatchAcademy/mastodon/pull/44\#discussion_r1766143286 and https://github.com/NeuromatchAcademy/mastodon/pull/44\#discussion_r1766148179 --- app/services/activitypub/fetch_replies_service.rb | 2 +- db/migrate/20240918233930_add_fetched_replies_at_to_status.rb | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 18b3d2eddc1fe2..c09d224263d796 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -25,7 +25,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } # Store last fetched all to debounce - @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? + @status.touch(:fetched_replies_at) @items end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb index c42eff6aeb38fc..229e43d978c493 100644 --- a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -1,8 +1,6 @@ # frozen_string_literal: true class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] - disable_ddl_transaction! - def change add_column :statuses, :fetched_replies_at, :datetime, null: true end From 714e490c549ae95a5c3bd57261c25b2c974f114b Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 22 Sep 2024 14:46:38 -0700 Subject: [PATCH 07/22] stashing some partial work on streaming --- app/controllers/api/v1/statuses_controller.rb | 28 +++++------ .../flavours/glitch/actions/statuses.js | 16 +++---- .../flavours/glitch/actions/streaming.js | 9 +++- .../flavours/glitch/actions/thread.ts | 12 +++++ .../flavours/glitch/reducers/thread.ts | 0 .../concerns/status/threading_concern.rb | 5 ++ .../activitypub/fetch_replies_service.rb | 14 +++++- app/services/fan_out_on_write_service.rb | 11 +++++ .../activitypub/fetch_replies_worker.rb | 16 ++++++- app/workers/fetch_reply_worker.rb | 47 ++++++++++--------- 10 files changed, 108 insertions(+), 50 deletions(-) create mode 100644 app/javascript/flavours/glitch/actions/thread.ts create mode 100644 app/javascript/flavours/glitch/reducers/thread.ts diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index c7569f4285dfea..fb86bd28c8f4d8 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -48,22 +48,6 @@ def context ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT - else - unless @status.local? && !@status.should_fetch_replies? - json_status = fetch_resource(@status.uri, true, current_account) - - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end - end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) @@ -75,6 +59,18 @@ def context statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + + unless @status.local? && !@status.should_fetch_replies? + ActivityPub::FetchRepliesWorker.perform_async( + @status.id, + nil, + { + allow_synchronous_requests: true, + all_replies: true, + current_account_id: current_account.id, + } + ) + end end def create diff --git a/app/javascript/flavours/glitch/actions/statuses.js b/app/javascript/flavours/glitch/actions/statuses.js index 2b6b589b374d5b..c50ff6b9549933 100644 --- a/app/javascript/flavours/glitch/actions/statuses.js +++ b/app/javascript/flavours/glitch/actions/statuses.js @@ -8,33 +8,33 @@ import { deleteFromTimelines } from './timelines'; export const STATUS_FETCH_REQUEST = 'STATUS_FETCH_REQUEST'; export const STATUS_FETCH_SUCCESS = 'STATUS_FETCH_SUCCESS'; -export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; +export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; export const STATUS_DELETE_REQUEST = 'STATUS_DELETE_REQUEST'; export const STATUS_DELETE_SUCCESS = 'STATUS_DELETE_SUCCESS'; -export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; +export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; export const CONTEXT_FETCH_REQUEST = 'CONTEXT_FETCH_REQUEST'; export const CONTEXT_FETCH_SUCCESS = 'CONTEXT_FETCH_SUCCESS'; -export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; +export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; export const STATUS_MUTE_REQUEST = 'STATUS_MUTE_REQUEST'; export const STATUS_MUTE_SUCCESS = 'STATUS_MUTE_SUCCESS'; -export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; +export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; export const STATUS_UNMUTE_REQUEST = 'STATUS_UNMUTE_REQUEST'; export const STATUS_UNMUTE_SUCCESS = 'STATUS_UNMUTE_SUCCESS'; -export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; +export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; -export const STATUS_REVEAL = 'STATUS_REVEAL'; -export const STATUS_HIDE = 'STATUS_HIDE'; +export const STATUS_REVEAL = 'STATUS_REVEAL'; +export const STATUS_HIDE = 'STATUS_HIDE'; export const STATUS_COLLAPSE = 'STATUS_COLLAPSE'; export const REDRAFT = 'REDRAFT'; export const STATUS_FETCH_SOURCE_REQUEST = 'STATUS_FETCH_SOURCE_REQUEST'; export const STATUS_FETCH_SOURCE_SUCCESS = 'STATUS_FETCH_SOURCE_SUCCESS'; -export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; +export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; export const STATUS_TRANSLATE_REQUEST = 'STATUS_TRANSLATE_REQUEST'; export const STATUS_TRANSLATE_SUCCESS = 'STATUS_TRANSLATE_SUCCESS'; diff --git a/app/javascript/flavours/glitch/actions/streaming.js b/app/javascript/flavours/glitch/actions/streaming.js index fa7af7055e6180..f114171fe73393 100644 --- a/app/javascript/flavours/glitch/actions/streaming.js +++ b/app/javascript/flavours/glitch/actions/streaming.js @@ -195,4 +195,11 @@ export const connectDirectStream = () => * @returns {function(): void} */ export const connectListStream = listId => - connectTimelineStream(`list:${listId}`, 'list', { list: listId }, { fillGaps: () => fillListTimelineGaps(listId) }); + connectTimelineStream(`list:${listId}`, 'list', {list: listId}, {fillGaps: () => fillListTimelineGaps(listId)}); + +/** + * @param {string} rootURI + * @returns {function(): void} + */ +export const connectThreadStream = rootURI => + connectTimelineStream(`thread:${rootURI}`, 'thread', {rootURI: rootURI}); diff --git a/app/javascript/flavours/glitch/actions/thread.ts b/app/javascript/flavours/glitch/actions/thread.ts new file mode 100644 index 00000000000000..c3e26b18e8073e --- /dev/null +++ b/app/javascript/flavours/glitch/actions/thread.ts @@ -0,0 +1,12 @@ +import { createAction } from '@reduxjs/toolkit'; + +import type { ApiStatusJSON } from 'flavours/glitch/api_types/statuses'; + +export const threadMount = createAction('THREAD_MOUNT'); +export const threadUnmount = createAction('THREAD_UNMOUNT'); + +interface ThreadUpdatePayload { + status: ApiStatusJSON; +} + +export const threadUpdate = createAction('THREAD_UPDATE'); diff --git a/app/javascript/flavours/glitch/reducers/thread.ts b/app/javascript/flavours/glitch/reducers/thread.ts new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/app/models/concerns/status/threading_concern.rb b/app/models/concerns/status/threading_concern.rb index 478a139d633be2..59bd7fc7a45dd4 100644 --- a/app/models/concerns/status/threading_concern.rb +++ b/app/models/concerns/status/threading_concern.rb @@ -32,6 +32,11 @@ def self_replies(limit) account.statuses.distributable_visibility.where(in_reply_to_id: id).reorder(id: :asc).limit(limit) end + def thread_root + maybe_ancestors = ancestor_ids(Api::V1::StatusesController::CONTEXT_LIMIT) + maybe_ancestors.none? ? id : maybe_ancestors[0] + end + private def ancestor_ids(limit) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c09d224263d796..bbc4bba6216efd 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -8,10 +8,11 @@ class ActivityPub::FetchRepliesService < BaseService # limit of fetched replies used when fetching all replies MAX_REPLIES_HIGH = 500 - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false, current_account_id: nil) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies + @current_account_id = current_account_id # store the status and whether we should fetch replies for it to avoid # race conditions if something else updates us in the meantime @status = parent_status @@ -23,7 +24,16 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } + FetchReplyWorker.push_bulk(filtered_replies) do |reply_uri| + [ + reply_uri, + { + request_id: request_id, + all_replies: @all_replies, + current_account_id: @current_account_id, + }, + ] + end # Store last fetched all to debounce @status.touch(:fetched_replies_at) diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 71ab1ac494d53f..ec934026899950 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -38,6 +38,7 @@ def check_race_condition! def fan_out_to_local_recipients! deliver_to_self! + deliver_to_thread_stream! unless @options[:skip_notifications] notify_mentioned_accounts! @@ -71,6 +72,12 @@ def deliver_to_self! FeedManager.instance.push_to_direct(@account, @status, update: update?) if @account.local? && @status.direct_visibility? end + def deliver_to_thread_stream + return unless subscribed_to_thread_stream? + + redis.publish("timeline:thread:#{@status.thread_root}", anonymous_payload) + end + def notify_mentioned_accounts! @status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions| LocalNotificationWorker.push_bulk(mentions) do |mention| @@ -183,4 +190,8 @@ def broadcastable? def subscribed_to_streaming_api?(account_id) redis.exists?("subscribed:timeline:#{account_id}") || redis.exists?("subscribed:timeline:#{account_id}:notifications") end + + def subscribed_to_thread_stream? + redis.exists?("subscribed:timeline:thread:#{@status.thread_root}") + end end diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index d72bad745261d3..a55df06bfa3e7f 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -6,9 +6,23 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 - def perform(parent_status_id, replies_uri, options = {}) + def perform(parent_status_id, replies_uri = nil, options = {}) + @current_account_id = options.fetch(:current_account_id, nil) + replies_uri = get_replies_uri(status) if replies_uri.nil? + return if replies_uri.nil? + ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end + + private + + def get_replies_uri(status) + current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) + json_status = fetch_resource(status.uri, true, current_account) + replies_uri = json_status['replies'] + Rails.logger.debug { "Could not find replies uri for status: #{status}" } if replies_uri.nil? + replies_uri + end end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 280bc8406b4d84..ca05d0ae6cad92 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -8,27 +8,30 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - all_replies = options.delete('all_replies') - - status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) - - # asked to fetch replies recursively - do the second-level calls async - if all_replies && status - json_status = fetch_resource(status.uri, true) - - collection = json_status['replies'] - unless collection.nil? - # if expanding replies recursively, spread out the recursive calls - ActivityPub::FetchRepliesWorker.perform_in( - rand(1..30).seconds, - status.id, - collection, - { - allow_synchronous_requests: true, - all_replies: true, - } - ) - end - end + @all_replies = options.fetch('all_replies', nil) + @current_account_id = options.delete('current_account_id') + + @status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + + recurse + + @status + end + + private + + def recurse + return unless @all_replies && @status + + ActivityPub::FetchRepliesWorker.perform_in( + rand(1..30).seconds, + @status.id, + nil, + { + allow_synchronous_requests: true, + all_replies: true, + current_account_id: @current_account_id, + } + ) end end From 40fbfea17e9b6e79f69f6df1cbd79c61e5361734 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 22 Sep 2024 14:51:23 -0700 Subject: [PATCH 08/22] Revert "stashing some partial work on streaming" This reverts commit 98c280e6a26a2ebbbacf34be0a26a642b1f3dee7. --- app/controllers/api/v1/statuses_controller.rb | 28 ++++++----- .../flavours/glitch/actions/statuses.js | 16 +++---- .../flavours/glitch/actions/streaming.js | 9 +--- .../flavours/glitch/actions/thread.ts | 12 ----- .../flavours/glitch/reducers/thread.ts | 0 .../concerns/status/threading_concern.rb | 5 -- .../activitypub/fetch_replies_service.rb | 14 +----- app/services/fan_out_on_write_service.rb | 11 ----- .../activitypub/fetch_replies_worker.rb | 16 +------ app/workers/fetch_reply_worker.rb | 47 +++++++++---------- 10 files changed, 50 insertions(+), 108 deletions(-) delete mode 100644 app/javascript/flavours/glitch/actions/thread.ts delete mode 100644 app/javascript/flavours/glitch/reducers/thread.ts diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index fb86bd28c8f4d8..c7569f4285dfea 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -48,6 +48,22 @@ def context ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT + else + unless @status.local? && !@status.should_fetch_replies? + json_status = fetch_resource(@status.uri, true, current_account) + + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] + + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end + end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) @@ -59,18 +75,6 @@ def context statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) - - unless @status.local? && !@status.should_fetch_replies? - ActivityPub::FetchRepliesWorker.perform_async( - @status.id, - nil, - { - allow_synchronous_requests: true, - all_replies: true, - current_account_id: current_account.id, - } - ) - end end def create diff --git a/app/javascript/flavours/glitch/actions/statuses.js b/app/javascript/flavours/glitch/actions/statuses.js index c50ff6b9549933..2b6b589b374d5b 100644 --- a/app/javascript/flavours/glitch/actions/statuses.js +++ b/app/javascript/flavours/glitch/actions/statuses.js @@ -8,33 +8,33 @@ import { deleteFromTimelines } from './timelines'; export const STATUS_FETCH_REQUEST = 'STATUS_FETCH_REQUEST'; export const STATUS_FETCH_SUCCESS = 'STATUS_FETCH_SUCCESS'; -export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; +export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; export const STATUS_DELETE_REQUEST = 'STATUS_DELETE_REQUEST'; export const STATUS_DELETE_SUCCESS = 'STATUS_DELETE_SUCCESS'; -export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; +export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; export const CONTEXT_FETCH_REQUEST = 'CONTEXT_FETCH_REQUEST'; export const CONTEXT_FETCH_SUCCESS = 'CONTEXT_FETCH_SUCCESS'; -export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; +export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; export const STATUS_MUTE_REQUEST = 'STATUS_MUTE_REQUEST'; export const STATUS_MUTE_SUCCESS = 'STATUS_MUTE_SUCCESS'; -export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; +export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; export const STATUS_UNMUTE_REQUEST = 'STATUS_UNMUTE_REQUEST'; export const STATUS_UNMUTE_SUCCESS = 'STATUS_UNMUTE_SUCCESS'; -export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; +export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; -export const STATUS_REVEAL = 'STATUS_REVEAL'; -export const STATUS_HIDE = 'STATUS_HIDE'; +export const STATUS_REVEAL = 'STATUS_REVEAL'; +export const STATUS_HIDE = 'STATUS_HIDE'; export const STATUS_COLLAPSE = 'STATUS_COLLAPSE'; export const REDRAFT = 'REDRAFT'; export const STATUS_FETCH_SOURCE_REQUEST = 'STATUS_FETCH_SOURCE_REQUEST'; export const STATUS_FETCH_SOURCE_SUCCESS = 'STATUS_FETCH_SOURCE_SUCCESS'; -export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; +export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; export const STATUS_TRANSLATE_REQUEST = 'STATUS_TRANSLATE_REQUEST'; export const STATUS_TRANSLATE_SUCCESS = 'STATUS_TRANSLATE_SUCCESS'; diff --git a/app/javascript/flavours/glitch/actions/streaming.js b/app/javascript/flavours/glitch/actions/streaming.js index f114171fe73393..fa7af7055e6180 100644 --- a/app/javascript/flavours/glitch/actions/streaming.js +++ b/app/javascript/flavours/glitch/actions/streaming.js @@ -195,11 +195,4 @@ export const connectDirectStream = () => * @returns {function(): void} */ export const connectListStream = listId => - connectTimelineStream(`list:${listId}`, 'list', {list: listId}, {fillGaps: () => fillListTimelineGaps(listId)}); - -/** - * @param {string} rootURI - * @returns {function(): void} - */ -export const connectThreadStream = rootURI => - connectTimelineStream(`thread:${rootURI}`, 'thread', {rootURI: rootURI}); + connectTimelineStream(`list:${listId}`, 'list', { list: listId }, { fillGaps: () => fillListTimelineGaps(listId) }); diff --git a/app/javascript/flavours/glitch/actions/thread.ts b/app/javascript/flavours/glitch/actions/thread.ts deleted file mode 100644 index c3e26b18e8073e..00000000000000 --- a/app/javascript/flavours/glitch/actions/thread.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { createAction } from '@reduxjs/toolkit'; - -import type { ApiStatusJSON } from 'flavours/glitch/api_types/statuses'; - -export const threadMount = createAction('THREAD_MOUNT'); -export const threadUnmount = createAction('THREAD_UNMOUNT'); - -interface ThreadUpdatePayload { - status: ApiStatusJSON; -} - -export const threadUpdate = createAction('THREAD_UPDATE'); diff --git a/app/javascript/flavours/glitch/reducers/thread.ts b/app/javascript/flavours/glitch/reducers/thread.ts deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/app/models/concerns/status/threading_concern.rb b/app/models/concerns/status/threading_concern.rb index 59bd7fc7a45dd4..478a139d633be2 100644 --- a/app/models/concerns/status/threading_concern.rb +++ b/app/models/concerns/status/threading_concern.rb @@ -32,11 +32,6 @@ def self_replies(limit) account.statuses.distributable_visibility.where(in_reply_to_id: id).reorder(id: :asc).limit(limit) end - def thread_root - maybe_ancestors = ancestor_ids(Api::V1::StatusesController::CONTEXT_LIMIT) - maybe_ancestors.none? ? id : maybe_ancestors[0] - end - private def ancestor_ids(limit) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index bbc4bba6216efd..c09d224263d796 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -8,11 +8,10 @@ class ActivityPub::FetchRepliesService < BaseService # limit of fetched replies used when fetching all replies MAX_REPLIES_HIGH = 500 - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false, current_account_id: nil) + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies - @current_account_id = current_account_id # store the status and whether we should fetch replies for it to avoid # race conditions if something else updates us in the meantime @status = parent_status @@ -24,16 +23,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) do |reply_uri| - [ - reply_uri, - { - request_id: request_id, - all_replies: @all_replies, - current_account_id: @current_account_id, - }, - ] - end + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } # Store last fetched all to debounce @status.touch(:fetched_replies_at) diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index ec934026899950..71ab1ac494d53f 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -38,7 +38,6 @@ def check_race_condition! def fan_out_to_local_recipients! deliver_to_self! - deliver_to_thread_stream! unless @options[:skip_notifications] notify_mentioned_accounts! @@ -72,12 +71,6 @@ def deliver_to_self! FeedManager.instance.push_to_direct(@account, @status, update: update?) if @account.local? && @status.direct_visibility? end - def deliver_to_thread_stream - return unless subscribed_to_thread_stream? - - redis.publish("timeline:thread:#{@status.thread_root}", anonymous_payload) - end - def notify_mentioned_accounts! @status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions| LocalNotificationWorker.push_bulk(mentions) do |mention| @@ -190,8 +183,4 @@ def broadcastable? def subscribed_to_streaming_api?(account_id) redis.exists?("subscribed:timeline:#{account_id}") || redis.exists?("subscribed:timeline:#{account_id}:notifications") end - - def subscribed_to_thread_stream? - redis.exists?("subscribed:timeline:thread:#{@status.thread_root}") - end end diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index a55df06bfa3e7f..d72bad745261d3 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -6,23 +6,9 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 - def perform(parent_status_id, replies_uri = nil, options = {}) - @current_account_id = options.fetch(:current_account_id, nil) - replies_uri = get_replies_uri(status) if replies_uri.nil? - return if replies_uri.nil? - + def perform(parent_status_id, replies_uri, options = {}) ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end - - private - - def get_replies_uri(status) - current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) - json_status = fetch_resource(status.uri, true, current_account) - replies_uri = json_status['replies'] - Rails.logger.debug { "Could not find replies uri for status: #{status}" } if replies_uri.nil? - replies_uri - end end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index ca05d0ae6cad92..280bc8406b4d84 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -8,30 +8,27 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - @all_replies = options.fetch('all_replies', nil) - @current_account_id = options.delete('current_account_id') - - @status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) - - recurse - - @status - end - - private - - def recurse - return unless @all_replies && @status - - ActivityPub::FetchRepliesWorker.perform_in( - rand(1..30).seconds, - @status.id, - nil, - { - allow_synchronous_requests: true, - all_replies: true, - current_account_id: @current_account_id, - } - ) + all_replies = options.delete('all_replies') + + status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + + # asked to fetch replies recursively - do the second-level calls async + if all_replies && status + json_status = fetch_resource(status.uri, true) + + collection = json_status['replies'] + unless collection.nil? + # if expanding replies recursively, spread out the recursive calls + ActivityPub::FetchRepliesWorker.perform_in( + rand(1..30).seconds, + status.id, + collection, + { + allow_synchronous_requests: true, + all_replies: true, + } + ) + end + end end end From 1632d2ff1e50a337732d314ba8f9d91ccd4a8207 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:24:04 -0700 Subject: [PATCH 09/22] Remove recursion, separate out into separate workers/services, add limit to global maximum statuses fetched (untested, this might not work yet) --- app/controllers/api/v1/statuses_controller.rb | 16 +++++-- .../concerns/status/fetch_replies_concern.rb | 27 +++++++++++ app/models/status.rb | 10 +---- .../activitypub/fetch_all_replies_service.rb | 37 +++++++++++++++ .../activitypub/fetch_replies_service.rb | 34 +++++--------- .../activitypub/fetch_all_replies_worker.rb | 45 +++++++++++++++++++ app/workers/fetch_reply_worker.rb | 23 +--------- 7 files changed, 134 insertions(+), 58 deletions(-) create mode 100644 app/models/concerns/status/fetch_replies_concern.rb create mode 100644 app/services/activitypub/fetch_all_replies_service.rb create mode 100644 app/workers/activitypub/fetch_all_replies_worker.rb diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index c7569f4285dfea..7509b0cac808fd 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -66,15 +66,25 @@ def context end end - ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) + ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) descendants_results = @status.descendants(descendants_limit, current_account, descendants_depth_limit) - loaded_ancestors = preload_collection(ancestors_results, Status) - loaded_descendants = preload_collection(descendants_results, Status) + loaded_ancestors = preload_collection(ancestors_results, Status) + loaded_descendants = preload_collection(descendants_results, Status) @context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants) statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + + if @status.should_fetch_replies? + ActivityPub::FetchAllRepliesWorker.perform_async( + @status.id, + { + allow_synchronous_requests: true, + current_account_id: current_account.id, + } + ) + end end def create diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb new file mode 100644 index 00000000000000..10ddb71f099b29 --- /dev/null +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Status::FetchRepliesConcern + extend ActiveSupport::Concern + + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = 30.minutes + + CREATED_RECENTLY_DEBOUNCE = 10.minutes + + included do + scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) } + scope :not_created_recently, -> { where(created_at: ..CREATED_RECENTLY_DEBOUNCE.ago) } + scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) } + scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) } + + scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) } + scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) } + end + + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + !local? && created_at <= 10.minutes.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago + ) + end +end diff --git a/app/models/status.rb b/app/models/status.rb index 2018c91b769c4e..39b4f791531aa6 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -37,6 +37,7 @@ class Status < ApplicationRecord include Discard::Model include Paginable include RateLimitable + include Status::FetchRepliesConcern include Status::SafeReblogInsert include Status::SearchConcern include Status::SnapshotConcern @@ -184,8 +185,6 @@ class Status < ApplicationRecord delegate :domain, to: :account, prefix: true REAL_TIME_WINDOW = 6.hours - # debounce fetching all replies to minimize DoS - FETCH_REPLIES_DEBOUNCE = 30.minutes def cache_key "v3:#{super}" @@ -443,13 +442,6 @@ def unlink_from_conversations! end end - def should_fetch_replies? - # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= 10.minutes.ago && ( - fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago - ) - end - private def update_status_stat!(attrs) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb new file mode 100644 index 00000000000000..19fae824dff495 --- /dev/null +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService + include JsonLdHelper + + # Limit of replies to fetch per status + MAX_REPLIES = 500 + + def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) + @allow_synchronous_requests = allow_synchronous_requests + @filter_by_host = false + + @items = collection_items(collection_or_uri) + @items = filtered_replies + return if @items.nil? + + FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + + @items + end + + private + + def filtered_replies + return if @items.nil? + + # find all statuses that we *shouldn't* update the replies for, and use that as a filter + uris = @items.map { |item| value_or_id(item) } + dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) + + # touch all statuses that already exist and that we're about to update + Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) + + # Reject all statuses that we already have in the db + uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + end +end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c09d224263d796..dcaa9385bb311b 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -3,22 +3,13 @@ class ActivityPub::FetchRepliesService < BaseService include JsonLdHelper - # Limit of fetched replies used when not fetching all replies - MAX_REPLIES_LOW = 5 - # limit of fetched replies used when fetching all replies - MAX_REPLIES_HIGH = 500 - - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) - # Whether we are getting replies from more than the originating server, - # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` - @all_replies = all_replies - # store the status and whether we should fetch replies for it to avoid - # race conditions if something else updates us in the meantime - @status = parent_status - @should_fetch_replies = parent_status.should_fetch_replies? + # Limit of fetched replies + MAX_REPLIES = 5 + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests + @filter_by_host = filter_by_host @items = collection_items(collection_or_uri) return if @items.nil? @@ -51,7 +42,7 @@ def collection_items(collection_or_uri) all_items.concat(as_array(items)) # Quit early if we are not fetching all replies - break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies? + break if all_items.size >= MAX_REPLIES collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end @@ -62,7 +53,7 @@ def collection_items(collection_or_uri) def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if !@all_replies && non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if @filter_by_host && non_matching_uri_hosts?(@account.uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -81,19 +72,14 @@ def fetch_collection(collection_or_uri) end def filtered_replies - if @all_replies - # Reject all statuses that we already have in the db - @items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) } - else + if @filter_by_host # Only fetch replies to the same server as the original status to avoid # amplification attacks. # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) + @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES) + else + @items.map { |item| value_or_id(item) }.take(MAX_REPLIES) end end - - def fetch_all_replies? - @all_replies && @should_fetch_replies - end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb new file mode 100644 index 00000000000000..882faf7dca3a24 --- /dev/null +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +# Fetch all replies to a status, querying recursively through +# ActivityPub replies collections, fetching any statuses that +# we either don't already have or we haven't checked for new replies +# in the Status::FETCH_REPLIES_DEBOUNCE interval +class ActivityPub::FetchAllRepliesWorker + include Sidekiq::Worker + include ExponentialBackoff + + sidekiq_options queue: 'pull', retry: 3 + + # Global max replies to fetch per request + MAX_REPLIES = 1000 + + def perform(parent_status_id, options = {}) + @parent_status = Status.find(parent_status_id) + @current_account_id = options.fetch(:current_account_id, nil) + @current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) + + all_replies = get_replies(@parent_status.uri) + got_replies = all_replies.length + until all_replies.empty? || got_replies >= MAX_REPLIES + new_replies = get_replies(all_replies.pop) + got_replies += new_replies.length + all_replies << new_replies + end + end + + private + + def get_replies(status_uri) + replies_uri = get_replies_uri(status_uri) + return if replies_uri.nil? + + ActivityPub::FetchAllRepliesService.new.call(replies_uri, **options.deep_symbolize_keys) + end + + def get_replies_uri(parent_status_uri) + json_status = fetch_resource(parent_status_uri, true, @current_account) + replies_uri = json_status['replies'] + Rails.logger.debug { "Could not find replies uri for status URI: #{parent_status_uri}" } if replies_uri.nil? + replies_uri + end +end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 280bc8406b4d84..08739babf07e10 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -8,27 +8,6 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - all_replies = options.delete('all_replies') - - status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) - - # asked to fetch replies recursively - do the second-level calls async - if all_replies && status - json_status = fetch_resource(status.uri, true) - - collection = json_status['replies'] - unless collection.nil? - # if expanding replies recursively, spread out the recursive calls - ActivityPub::FetchRepliesWorker.perform_in( - rand(1..30).seconds, - status.id, - collection, - { - allow_synchronous_requests: true, - all_replies: true, - } - ) - end - end + FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) end end From 4aaf91e8940fea7ddf66d129905993f814136b49 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:29:52 -0700 Subject: [PATCH 10/22] rm redundant request to fetch replies worker in controller --- app/controllers/api/v1/statuses_controller.rb | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 7509b0cac808fd..fcafc5d1365e05 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -48,22 +48,6 @@ def context ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT - else - unless @status.local? && !@status.should_fetch_replies? - json_status = fetch_resource(@status.uri, true, current_account) - - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end - end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) @@ -76,7 +60,7 @@ def context render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) - if @status.should_fetch_replies? + if !current_account.nil? && @status.should_fetch_replies? ActivityPub::FetchAllRepliesWorker.perform_async( @status.id, { From a65e29fa707a478049920a356497af064217b73f Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:32:25 -0700 Subject: [PATCH 11/22] rm zombie code in fetch_replies_service --- app/services/activitypub/fetch_replies_service.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index dcaa9385bb311b..0dcdb638e1339e 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -14,9 +14,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } - # Store last fetched all to debounce - @status.touch(:fetched_replies_at) + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } @items end From 9d404e291600141587453095edba2350bd775ae9 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:37:47 -0700 Subject: [PATCH 12/22] rm spurious imports and reformatting --- app/controllers/api/v1/statuses_controller.rb | 11 +++++------ app/workers/fetch_reply_worker.rb | 1 - 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index fcafc5d1365e05..834862761665d0 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -2,7 +2,6 @@ class Api::V1::StatusesController < Api::BaseController include Authorization - include JsonLdHelper before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy] before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy] @@ -45,15 +44,15 @@ def context descendants_depth_limit = nil if current_account.nil? - ancestors_limit = ANCESTORS_LIMIT - descendants_limit = DESCENDANTS_LIMIT + ancestors_limit = ANCESTORS_LIMIT + descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT end - ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) + ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) descendants_results = @status.descendants(descendants_limit, current_account, descendants_depth_limit) - loaded_ancestors = preload_collection(ancestors_results, Status) - loaded_descendants = preload_collection(descendants_results, Status) + loaded_ancestors = preload_collection(ancestors_results, Status) + loaded_descendants = preload_collection(descendants_results, Status) @context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants) statuses = [@status] + @context.ancestors + @context.descendants diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 08739babf07e10..68a7414bebeaa0 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -3,7 +3,6 @@ class FetchReplyWorker include Sidekiq::Worker include ExponentialBackoff - include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 From 37e2c0b17996b76ec15d19fd1e6dca73f0db9954 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:38:23 -0700 Subject: [PATCH 13/22] rm more spurious formatting --- app/controllers/api/v1/statuses_controller.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 834862761665d0..e2adaa796d5c60 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -44,8 +44,8 @@ def context descendants_depth_limit = nil if current_account.nil? - ancestors_limit = ANCESTORS_LIMIT - descendants_limit = DESCENDANTS_LIMIT + ancestors_limit = ANCESTORS_LIMIT + descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT end From fc447f5d44c5cbe9799fbe4b28928e21fafd1719 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sat, 12 Oct 2024 21:02:31 -0700 Subject: [PATCH 14/22] Working version of fetch all replies service with global maximum on fetching --- app/controllers/api/v1/statuses_controller.rb | 2 +- .../concerns/status/fetch_replies_concern.rb | 2 +- .../activitypub/fetch_all_replies_worker.rb | 37 +++++++++++++------ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index e2adaa796d5c60..985e524070a78d 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -62,9 +62,9 @@ def context if !current_account.nil? && @status.should_fetch_replies? ActivityPub::FetchAllRepliesWorker.perform_async( @status.id, + current_account.id, { allow_synchronous_requests: true, - current_account_id: current_account.id, } ) end diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index 10ddb71f099b29..696e93ec88555c 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -20,7 +20,7 @@ module Status::FetchRepliesConcern def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= 10.minutes.ago && ( + !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago ) end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 882faf7dca3a24..af2cf40e91ca1d 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -7,29 +7,39 @@ class ActivityPub::FetchAllRepliesWorker include Sidekiq::Worker include ExponentialBackoff + include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 # Global max replies to fetch per request MAX_REPLIES = 1000 - def perform(parent_status_id, options = {}) + def perform(parent_status_id, current_account_id = nil, options = {}) @parent_status = Status.find(parent_status_id) - @current_account_id = options.fetch(:current_account_id, nil) - @current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) + @current_account_id = current_account_id + @current_account = @current_account_id.nil? ? nil : Account.find(@current_account_id) + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } - all_replies = get_replies(@parent_status.uri) + all_replies = get_replies(@parent_status.uri, options) got_replies = all_replies.length until all_replies.empty? || got_replies >= MAX_REPLIES - new_replies = get_replies(all_replies.pop) + next_reply = all_replies.pop + next if next_reply.nil? + + new_replies = get_replies(next_reply, options) + next if new_replies.nil? + got_replies += new_replies.length - all_replies << new_replies + all_replies.concat(new_replies) end + + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{got_replies} replies" } + got_replies end private - def get_replies(status_uri) + def get_replies(status_uri, options = {}) replies_uri = get_replies_uri(status_uri) return if replies_uri.nil? @@ -37,9 +47,14 @@ def get_replies(status_uri) end def get_replies_uri(parent_status_uri) - json_status = fetch_resource(parent_status_uri, true, @current_account) - replies_uri = json_status['replies'] - Rails.logger.debug { "Could not find replies uri for status URI: #{parent_status_uri}" } if replies_uri.nil? - replies_uri + begin + json_status = fetch_resource(parent_status_uri, true, @current_account) + replies_uri = json_status['replies'] + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_uri.nil? + replies_uri + rescue => e + Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } + nil + end end end From 2eeb6c4f1f14c3e0c6673a78d1d2cd96608c61b5 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sat, 12 Oct 2024 21:26:12 -0700 Subject: [PATCH 15/22] Fix limit in fetch_replies_service to not always limit by 5 (which always caused us to only do one page). Rename some variables to make purpose clearer. Return the array of all fetched uris instead of just the number we got --- .../activitypub/fetch_all_replies_service.rb | 4 +++- .../activitypub/fetch_replies_service.rb | 8 ++++--- .../activitypub/fetch_all_replies_worker.rb | 22 +++++++++---------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 19fae824dff495..cba476ab46c30c 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -10,7 +10,7 @@ def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false - @items = collection_items(collection_or_uri) + @items = collection_items(collection_or_uri, fetch_all: true) @items = filtered_replies return if @items.nil? @@ -25,6 +25,8 @@ def filtered_replies return if @items.nil? # find all statuses that we *shouldn't* update the replies for, and use that as a filter + # Typically we assume this is smaller than the replies we *should* fetch, + # so we minimize the number of uris we should load here. uris = @items.map { |item| value_or_id(item) } dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 0dcdb638e1339e..2963f616e2882b 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,6 +5,8 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 + # Limit when fetching all (to prevent infinite fetch attack) + FETCH_ALL_MAX_REPLIES = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @@ -21,7 +23,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req private - def collection_items(collection_or_uri) + def collection_items(collection_or_uri, fetch_all: false) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) @@ -39,8 +41,8 @@ def collection_items(collection_or_uri) all_items.concat(as_array(items)) - # Quit early if we are not fetching all replies - break if all_items.size >= MAX_REPLIES + # Quit early if we are not fetching all replies or we've reached the absolute max + break if (!fetch_all && all_items.size >= MAX_REPLIES) || (all_items.size >= FETCH_ALL_MAX_REPLIES) collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index af2cf40e91ca1d..222736224b2439 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -11,7 +11,7 @@ class ActivityPub::FetchAllRepliesWorker sidekiq_options queue: 'pull', retry: 3 - # Global max replies to fetch per request + # Global max replies to fetch per request (all replies, recursively) MAX_REPLIES = 1000 def perform(parent_status_id, current_account_id = nil, options = {}) @@ -20,21 +20,21 @@ def perform(parent_status_id, current_account_id = nil, options = {}) @current_account = @current_account_id.nil? ? nil : Account.find(@current_account_id) Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } - all_replies = get_replies(@parent_status.uri, options) - got_replies = all_replies.length - until all_replies.empty? || got_replies >= MAX_REPLIES - next_reply = all_replies.pop + uris_to_fetch = get_replies(@parent_status.uri, options) + fetched_uris = uris_to_fetch + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES + next_reply = uris_to_fetch.pop next if next_reply.nil? - new_replies = get_replies(next_reply, options) - next if new_replies.nil? + new_reply_uris = get_replies(next_reply, options) + next if new_reply_uris.nil? - got_replies += new_replies.length - all_replies.concat(new_replies) + uris_to_fetch.concat(new_reply_uris) + fetched_uris.concat(new_reply_uris) end - Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{got_replies} replies" } - got_replies + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } + fetched_uris end private From 6d3ccb91ece0fc78f82a03f6dd0339857d25bfe0 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 13 Oct 2024 00:58:45 -0700 Subject: [PATCH 16/22] the most basic test you could imagine --- .../activitypub/fetch_all_replies_service.rb | 12 ++- .../fetch_all_replies_service_spec.rb | 73 +++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 spec/services/activitypub/fetch_all_replies_service_spec.rb diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index cba476ab46c30c..292d5aad301c01 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -24,9 +24,15 @@ def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) def filtered_replies return if @items.nil? - # find all statuses that we *shouldn't* update the replies for, and use that as a filter - # Typically we assume this is smaller than the replies we *should* fetch, - # so we minimize the number of uris we should load here. + # Find all statuses that we *shouldn't* update the replies for, and use that as a filter. + # We don't assume that we have the statuses before they're created, + # hence the negative filter - + # "keep all these uris except the ones we already have" + # instead of + # "keep all these uris that match some conditions on existing Status objects" + # + # Typically we assume the number of replies we *shouldn't* fetch is smaller than the + # replies we *should* fetch, so we also minimize the number of uris we should load here. uris = @items.map { |item| value_or_id(item) } dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) diff --git a/spec/services/activitypub/fetch_all_replies_service_spec.rb b/spec/services/activitypub/fetch_all_replies_service_spec.rb new file mode 100644 index 00000000000000..5ab1536d519f9b --- /dev/null +++ b/spec/services/activitypub/fetch_all_replies_service_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesService do + subject { described_class.new } + + let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') } + let(:status) { Fabricate(:status, account: actor) } + let(:collection_uri) { 'http://example.com/replies/1' } + + let(:items) do + [ + 'http://example.com/self-reply-1', + 'http://example.com/self-reply-2', + 'http://example.com/self-reply-3', + 'http://other.com/other-reply-1', + 'http://other.com/other-reply-2', + 'http://other.com/other-reply-3', + 'http://example.com/self-reply-4', + 'http://example.com/self-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Collection', + id: collection_uri, + items: items, + }.with_indifferent_access + end + + describe '#call' do + it 'fetches more than the default maximum and from multiple domains' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 + http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + + context 'with a recent status' do + before do + Fabricate(:status, uri: 'http://example.com/self-reply-2', fetched_replies_at: 1.second.ago, local: false) + end + + it 'skips statuses that have been updated recently' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + end + + context 'with an old status' do + before do + Fabricate(:status, uri: 'http://other.com/other-reply-1', fetched_replies_at: 1.year.ago, created_at: 1.year.ago, account: actor) + end + + it 'updates the time that fetched statuses were last fetched' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(Status.find_by(uri: 'http://other.com/other-reply-1').fetched_replies_at).to be >= 1.minute.ago + end + end + end +end From f7b309f8f32f97fb2738c44be03bcf63ef7bbc92 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 13 Oct 2024 18:10:45 -0700 Subject: [PATCH 17/22] tests for the fetch all reply worker --- .../activitypub/fetch_all_replies_worker.rb | 16 +- .../fetch_all_replies_worker_spec.rb | 254 ++++++++++++++++++ 2 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 spec/workers/activitypub/fetch_all_replies_worker_spec.rb diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 222736224b2439..53e132f61ee6f9 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -21,7 +21,9 @@ def perform(parent_status_id, current_account_id = nil, options = {}) Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch = get_replies(@parent_status.uri, options) - fetched_uris = uris_to_fetch + fetched_uris = uris_to_fetch.clone + return if uris_to_fetch.nil? + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES next_reply = uris_to_fetch.pop next if next_reply.nil? @@ -40,18 +42,18 @@ def perform(parent_status_id, current_account_id = nil, options = {}) private def get_replies(status_uri, options = {}) - replies_uri = get_replies_uri(status_uri) - return if replies_uri.nil? + replies_collection_or_uri = get_replies_uri(status_uri) + return if replies_collection_or_uri.nil? - ActivityPub::FetchAllRepliesService.new.call(replies_uri, **options.deep_symbolize_keys) + ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, **options.deep_symbolize_keys) end def get_replies_uri(parent_status_uri) begin json_status = fetch_resource(parent_status_uri, true, @current_account) - replies_uri = json_status['replies'] - Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_uri.nil? - replies_uri + replies_collection_or_uri = json_status['replies'] + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_collection_or_uri.nil? + replies_collection_or_uri rescue => e Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } nil diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb new file mode 100644 index 00000000000000..35fadb6caf1497 --- /dev/null +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -0,0 +1,254 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesWorker do + subject { described_class.new } + + let(:top_items) do + [ + 'http://example.com/self-reply-1', + 'http://other.com/other-reply-2', + 'http://example.com/self-reply-3', + ] + end + + let(:top_items_paged) do + [ + 'http://example.com/self-reply-4', + 'http://other.com/other-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:nested_items) do + [ + 'http://example.com/nested-self-reply-1', + 'http://other.com/nested-other-reply-2', + 'http://example.com/nested-self-reply-3', + ] + end + + let(:nested_items_paged) do + [ + 'http://example.com/nested-self-reply-4', + 'http://other.com/nested-other-reply-5', + 'http://example.com/nested-self-reply-6', + ] + end + + let(:all_items) do + top_items + top_items_paged + nested_items + nested_items_paged + end + + let(:top_note_uri) do + 'http://example.com/top-post' + end + + let(:top_collection_uri) do + 'http://example.com/top-post/replies' + end + + # The reply uri that has the nested replies under it + let(:reply_note_uri) do + 'http://other.com/other-reply-2' + end + + # The collection uri of nested replies + let(:reply_collection_uri) do + 'http://other.com/other-reply-2/replies' + end + + let(:replies_top) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_collection_uri, + type: 'Collection', + items: top_items + top_items_paged, + } + end + + let(:replies_nested) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_collection_uri, + type: 'Collection', + items: nested_items + nested_items_paged, + } + end + + # The status resource for the top post + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_top, + attributedTo: 'https://example.com', + } + end + + # The status resource that has the uri to the replies collection + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_nested, + attributedTo: 'https://other.com', + } + end + + let(:empty_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://example.com/empty', + type: 'Note', + content: 'Lorem ipsum', + replies: [], + attributedTo: 'https://example.com', + } + end + + let(:account) { Fabricate(:account, domain: 'example.com') } + let(:status) { Fabricate(:status, account: account, uri: top_note_uri) } + + before do + allow(FetchReplyWorker).to receive(:push_bulk) + all_items.each do |item| + next if [top_note_uri, reply_note_uri].include? item + + stub_request(:get, item).to_return(status: 200, body: Oj.dump(empty_object), headers: { 'Content-Type': 'application/activity+json' }) + end + end + + shared_examples 'fetches all replies' do + before do + stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' }) + end + + it 'fetches statuses recursively' do + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(all_items) + end + + it 'respects the maxium limits set by not recursing after the max is reached' do + stub_const('ActivityPub::FetchAllRepliesWorker::MAX_REPLIES', 5) + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(top_items + top_items_paged) + end + end + + describe 'perform' do + context 'when the payload is a Note with replies as a Collection of inlined replies' do + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a URI to a Collection' do + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: top_collection_uri, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: reply_collection_uri, + attributedTo: 'https://other.com', + } + end + + before do + stub_request(:get, top_collection_uri).to_return(status: 200, body: Oj.dump(replies_top), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_collection_uri).to_return(status: 200, body: Oj.dump(replies_nested), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a paginated collection' do + let(:top_page_2_uri) do + "#{top_collection_uri}/2" + end + + let(:reply_page_2_uri) do + "#{reply_collection_uri}/2" + end + + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: top_collection_uri, + first: { + type: 'CollectionPage', + partOf: top_collection_uri, + items: top_items, + next: top_page_2_uri, + }, + }, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: reply_collection_uri, + first: { + type: 'CollectionPage', + partOf: reply_collection_uri, + items: nested_items, + next: reply_page_2_uri, + }, + }, + attributedTo: 'https://other.com', + } + end + + let(:top_page_two) do + { + type: 'CollectionPage', + id: top_page_2_uri, + partOf: top_collection_uri, + items: top_items_paged, + } + end + + let(:reply_page_two) do + { + type: 'CollectionPage', + id: reply_page_2_uri, + partOf: reply_collection_uri, + items: nested_items_paged, + } + end + + before do + stub_request(:get, top_page_2_uri).to_return(status: 200, body: Oj.dump(top_page_two), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_page_2_uri).to_return(status: 200, body: Oj.dump(reply_page_two), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + end +end From 3271217ba1137cbf6d393b4685532864b00d45cb Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 14 Oct 2024 19:57:54 -0700 Subject: [PATCH 18/22] add configurability via .env file --- .env.production.sample | 9 +++++++++ app/models/concerns/status/fetch_replies_concern.rb | 10 ++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/.env.production.sample b/.env.production.sample index 83e2b198d22879..a714fc94059b93 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -312,6 +312,15 @@ MAX_POLL_OPTION_CHARS=100 # New registrations will automatically follow these accounts (separated by commas) AUTOFOLLOW= +# -- Fetch all replies settings -- +# When a user expands a post (DetailedStatus view), fetch all of its replies +# (default: true if unset, set explicitly to ``false`` to disable) +FETCH_REPLIES_ENABLED=true +# Period to wait between fetching replies (in minutes) +FETCH_REPLIES_DEBOUNCE=15 +# Period to wait after a post is first created before fetching its replies (in minutes) +FETCH_REPLIES_CREATED_RECENTLY=5 + # IP and session retention # ----------------------- # Make sure to modify the scheduling of ip_cleanup_scheduler in config/sidekiq.yml diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index 696e93ec88555c..62c908840a72e2 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -3,10 +3,12 @@ module Status::FetchRepliesConcern extend ActiveSupport::Concern - # debounce fetching all replies to minimize DoS - FETCH_REPLIES_DEBOUNCE = 30.minutes + # enable/disable fetching all replies + FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true - CREATED_RECENTLY_DEBOUNCE = 10.minutes + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = (ENV['FETCH_REPLIES_DEBOUNCE'] || 15).to_i.minutes + CREATED_RECENTLY_DEBOUNCE = (ENV['FETCH_REPLIES_CREATED_RECENTLY'] || 5).to_i.minutes included do scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) } @@ -20,7 +22,7 @@ module Status::FetchRepliesConcern def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( + FETCH_REPLIES_ENABLED && !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago ) end From cc1a1be51aba72b420b1ee9b102f740f67fad669 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 14 Oct 2024 21:41:19 -0700 Subject: [PATCH 19/22] remove account on_behalf_of entirely since it doesn't do anything in this context anyway --- .env.production.sample | 2 ++ app/controllers/api/v1/statuses_controller.rb | 1 - app/workers/activitypub/fetch_all_replies_worker.rb | 6 ++---- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.env.production.sample b/.env.production.sample index a714fc94059b93..b3ef77aaf131a5 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -316,8 +316,10 @@ AUTOFOLLOW= # When a user expands a post (DetailedStatus view), fetch all of its replies # (default: true if unset, set explicitly to ``false`` to disable) FETCH_REPLIES_ENABLED=true + # Period to wait between fetching replies (in minutes) FETCH_REPLIES_DEBOUNCE=15 + # Period to wait after a post is first created before fetching its replies (in minutes) FETCH_REPLIES_CREATED_RECENTLY=5 diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 985e524070a78d..c9ce8e348c00ae 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -62,7 +62,6 @@ def context if !current_account.nil? && @status.should_fetch_replies? ActivityPub::FetchAllRepliesWorker.perform_async( @status.id, - current_account.id, { allow_synchronous_requests: true, } diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 53e132f61ee6f9..c739cafbb183ac 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -14,10 +14,8 @@ class ActivityPub::FetchAllRepliesWorker # Global max replies to fetch per request (all replies, recursively) MAX_REPLIES = 1000 - def perform(parent_status_id, current_account_id = nil, options = {}) + def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) - @current_account_id = current_account_id - @current_account = @current_account_id.nil? ? nil : Account.find(@current_account_id) Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch = get_replies(@parent_status.uri, options) @@ -50,7 +48,7 @@ def get_replies(status_uri, options = {}) def get_replies_uri(parent_status_uri) begin - json_status = fetch_resource(parent_status_uri, true, @current_account) + json_status = fetch_resource(parent_status_uri, true) replies_collection_or_uri = json_status['replies'] Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_collection_or_uri.nil? replies_collection_or_uri From f6c0079b9bbb12b8300386b0ef03dadc4644511e Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 14 Oct 2024 22:10:41 -0700 Subject: [PATCH 20/22] make limits configurable too while we're at it --- .env.production.sample | 7 +++++++ app/services/activitypub/fetch_all_replies_service.rb | 2 +- app/workers/activitypub/fetch_all_replies_worker.rb | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.env.production.sample b/.env.production.sample index b3ef77aaf131a5..0e52d3b648ddc9 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -323,6 +323,13 @@ FETCH_REPLIES_DEBOUNCE=15 # Period to wait after a post is first created before fetching its replies (in minutes) FETCH_REPLIES_CREATED_RECENTLY=5 +# Max number of replies to fetch - total, recursively through a whole reply tree +FETCH_REPLIES_MAX_GLOBAL=1000 + +# Max number of replies to fetch - for a single post +FETCH_REPLIES_MAX_SINGLE=500 + + # IP and session retention # ----------------------- # Make sure to modify the scheduling of ip_cleanup_scheduler in config/sidekiq.yml diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 292d5aad301c01..491a807bea5f27 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -4,7 +4,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService include JsonLdHelper # Limit of replies to fetch per status - MAX_REPLIES = 500 + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index c739cafbb183ac..b641ef62bcb4b7 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -12,7 +12,7 @@ class ActivityPub::FetchAllRepliesWorker sidekiq_options queue: 'pull', retry: 3 # Global max replies to fetch per request (all replies, recursively) - MAX_REPLIES = 1000 + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) From ab35055e38c6e210c0df34ec8f7a4ce12216ce5d Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 14 Oct 2024 22:17:14 -0700 Subject: [PATCH 21/22] remove redundant params - forgot i subclassed --- app/services/activitypub/fetch_all_replies_service.rb | 2 +- app/services/activitypub/fetch_replies_service.rb | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 491a807bea5f27..09143127d5464f 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -10,7 +10,7 @@ def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false - @items = collection_items(collection_or_uri, fetch_all: true) + @items = collection_items(collection_or_uri) @items = filtered_replies return if @items.nil? diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 2963f616e2882b..bb20e666498668 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,8 +5,6 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 - # Limit when fetching all (to prevent infinite fetch attack) - FETCH_ALL_MAX_REPLIES = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @@ -23,7 +21,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req private - def collection_items(collection_or_uri, fetch_all: false) + def collection_items(collection_or_uri) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) @@ -41,8 +39,7 @@ def collection_items(collection_or_uri, fetch_all: false) all_items.concat(as_array(items)) - # Quit early if we are not fetching all replies or we've reached the absolute max - break if (!fetch_all && all_items.size >= MAX_REPLIES) || (all_items.size >= FETCH_ALL_MAX_REPLIES) + break if all_items.size > MAX_REPLIES collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end From 8048524ed0a919a6751ac946fec9a8afadc51fb5 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Fri, 18 Oct 2024 23:07:15 -0700 Subject: [PATCH 22/22] add some more debugging messages, catch errors in fetching child uris, deduplicate uris to fetch, prevent multiple fetch-alls for the parent status --- .../activitypub/fetch_all_replies_service.rb | 6 +++- .../activitypub/fetch_all_replies_worker.rb | 28 ++++++++++++++----- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 09143127d5464f..50be4c9e8b4a58 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -9,6 +9,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false + @collection_or_uri = collection_or_uri @items = collection_items(collection_or_uri) @items = filtered_replies @@ -40,6 +41,9 @@ def filtered_replies Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) # Reject all statuses that we already have in the db - uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + + Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } + uris end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index b641ef62bcb4b7..0f4fd83caf6a78 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -16,12 +16,15 @@ class ActivityPub::FetchAllRepliesWorker def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) - Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch = get_replies(@parent_status.uri, options) - fetched_uris = uris_to_fetch.clone return if uris_to_fetch.nil? + @parent_status.touch(:fetched_replies_at) + + fetched_uris = uris_to_fetch.clone.to_set + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES next_reply = uris_to_fetch.pop next if next_reply.nil? @@ -29,8 +32,10 @@ def perform(parent_status_id, options = {}) new_reply_uris = get_replies(next_reply, options) next if new_reply_uris.nil? + new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) } + uris_to_fetch.concat(new_reply_uris) - fetched_uris.concat(new_reply_uris) + fetched_uris = fetched_uris.merge(new_reply_uris) end Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } @@ -49,11 +54,20 @@ def get_replies(status_uri, options = {}) def get_replies_uri(parent_status_uri) begin json_status = fetch_resource(parent_status_uri, true) - replies_collection_or_uri = json_status['replies'] - Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_collection_or_uri.nil? - replies_collection_or_uri + if json_status.nil? + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: error getting replies URI for #{parent_status_uri}, returned nil" } + nil + elsif !json_status.key?('replies') + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: no replies collection found in ActivityPub object: #{json_status}" } + nil + else + json_status['replies'] + end rescue => e - Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } + Rails.logger.warn { "FetchAllRepliesWorker - #{@parent_status.uri}: caught exception fetching replies URI: #{e}" } + # Raise if we can't get the collection for top-level status to trigger retry + raise e if parent_status_uri == @parent_status.uri + nil end end