diff --git a/.env.production.sample b/.env.production.sample index 83e2b198d22879..0e52d3b648ddc9 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -312,6 +312,24 @@ MAX_POLL_OPTION_CHARS=100 # New registrations will automatically follow these accounts (separated by commas) AUTOFOLLOW= +# -- Fetch all replies settings -- +# When a user expands a post (DetailedStatus view), fetch all of its replies +# (default: true if unset, set explicitly to ``false`` to disable) +FETCH_REPLIES_ENABLED=true + +# Period to wait between fetching replies (in minutes) +FETCH_REPLIES_DEBOUNCE=15 + +# Period to wait after a post is first created before fetching its replies (in minutes) +FETCH_REPLIES_CREATED_RECENTLY=5 + +# Max number of replies to fetch - total, recursively through a whole reply tree +FETCH_REPLIES_MAX_GLOBAL=1000 + +# Max number of replies to fetch - for a single post +FETCH_REPLIES_MAX_SINGLE=500 + + # IP and session retention # ----------------------- # Make sure to modify the scheduling of ip_cleanup_scheduler in config/sidekiq.yml diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 2593ef7da582b3..c9ce8e348c00ae 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -58,6 +58,15 @@ def context statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + + if !current_account.nil? && @status.should_fetch_replies? + ActivityPub::FetchAllRepliesWorker.perform_async( + @status.id, + { + allow_synchronous_requests: true, + } + ) + end end def create diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb new file mode 100644 index 00000000000000..62c908840a72e2 --- /dev/null +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Status::FetchRepliesConcern + extend ActiveSupport::Concern + + # enable/disable fetching all replies + FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true + + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = (ENV['FETCH_REPLIES_DEBOUNCE'] || 15).to_i.minutes + CREATED_RECENTLY_DEBOUNCE = (ENV['FETCH_REPLIES_CREATED_RECENTLY'] || 5).to_i.minutes + + included do + scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) } + scope :not_created_recently, -> { where(created_at: ..CREATED_RECENTLY_DEBOUNCE.ago) } + scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) } + scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) } + + scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) } + scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) } + end + + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + FETCH_REPLIES_ENABLED && !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago + ) + end +end diff --git a/app/models/status.rb b/app/models/status.rb index 7bde0438a6a308..39b4f791531aa6 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -29,6 +29,7 @@ # edited_at :datetime # trendable :boolean # ordered_media_attachment_ids :bigint(8) is an Array +# fetched_replies_at :datetime # class Status < ApplicationRecord @@ -36,6 +37,7 @@ class Status < ApplicationRecord include Discard::Model include Paginable include RateLimitable + include Status::FetchRepliesConcern include Status::SafeReblogInsert include Status::SearchConcern include Status::SnapshotConcern diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb new file mode 100644 index 00000000000000..50be4c9e8b4a58 --- /dev/null +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService + include JsonLdHelper + + # Limit of replies to fetch per status + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i + + def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) + @allow_synchronous_requests = allow_synchronous_requests + @filter_by_host = false + @collection_or_uri = collection_or_uri + + @items = collection_items(collection_or_uri) + @items = filtered_replies + return if @items.nil? + + FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + + @items + end + + private + + def filtered_replies + return if @items.nil? + + # Find all statuses that we *shouldn't* update the replies for, and use that as a filter. + # We don't assume that we have the statuses before they're created, + # hence the negative filter - + # "keep all these uris except the ones we already have" + # instead of + # "keep all these uris that match some conditions on existing Status objects" + # + # Typically we assume the number of replies we *shouldn't* fetch is smaller than the + # replies we *should* fetch, so we also minimize the number of uris we should load here. + uris = @items.map { |item| value_or_id(item) } + dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) + + # touch all statuses that already exist and that we're about to update + Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) + + # Reject all statuses that we already have in the db + uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + + Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } + uris + end +end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 46cab6caf93dc3..bb20e666498668 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -3,9 +3,13 @@ class ActivityPub::FetchRepliesService < BaseService include JsonLdHelper - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil) + # Limit of fetched replies + MAX_REPLIES = 5 + + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests + @filter_by_host = filter_by_host @items = collection_items(collection_or_uri) return if @items.nil? @@ -24,18 +28,29 @@ def collection_items(collection_or_uri) collection = fetch_collection(collection['first']) if collection['first'].present? return unless collection.is_a?(Hash) - case collection['type'] - when 'Collection', 'CollectionPage' - as_array(collection['items']) - when 'OrderedCollection', 'OrderedCollectionPage' - as_array(collection['orderedItems']) + all_items = [] + while collection.is_a?(Hash) + items = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + + all_items.concat(as_array(items)) + + break if all_items.size > MAX_REPLIES + + collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end + + all_items end def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if @filter_by_host && non_matching_uri_hosts?(@account.uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -54,10 +69,14 @@ def fetch_collection(collection_or_uri) end def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. + if @filter_by_host + # Only fetch replies to the same server as the original status to avoid + # amplification attacks. - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5) + # Also limit to 5 fetched replies to limit potential for DoS. + @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES) + else + @items.map { |item| value_or_id(item) }.take(MAX_REPLIES) + end end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb new file mode 100644 index 00000000000000..0f4fd83caf6a78 --- /dev/null +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +# Fetch all replies to a status, querying recursively through +# ActivityPub replies collections, fetching any statuses that +# we either don't already have or we haven't checked for new replies +# in the Status::FETCH_REPLIES_DEBOUNCE interval +class ActivityPub::FetchAllRepliesWorker + include Sidekiq::Worker + include ExponentialBackoff + include JsonLdHelper + + sidekiq_options queue: 'pull', retry: 3 + + # Global max replies to fetch per request (all replies, recursively) + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i + + def perform(parent_status_id, options = {}) + @parent_status = Status.find(parent_status_id) + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } + + uris_to_fetch = get_replies(@parent_status.uri, options) + return if uris_to_fetch.nil? + + @parent_status.touch(:fetched_replies_at) + + fetched_uris = uris_to_fetch.clone.to_set + + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES + next_reply = uris_to_fetch.pop + next if next_reply.nil? + + new_reply_uris = get_replies(next_reply, options) + next if new_reply_uris.nil? + + new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) } + + uris_to_fetch.concat(new_reply_uris) + fetched_uris = fetched_uris.merge(new_reply_uris) + end + + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } + fetched_uris + end + + private + + def get_replies(status_uri, options = {}) + replies_collection_or_uri = get_replies_uri(status_uri) + return if replies_collection_or_uri.nil? + + ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, **options.deep_symbolize_keys) + end + + def get_replies_uri(parent_status_uri) + begin + json_status = fetch_resource(parent_status_uri, true) + if json_status.nil? + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: error getting replies URI for #{parent_status_uri}, returned nil" } + nil + elsif !json_status.key?('replies') + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: no replies collection found in ActivityPub object: #{json_status}" } + nil + else + json_status['replies'] + end + rescue => e + Rails.logger.warn { "FetchAllRepliesWorker - #{@parent_status.uri}: caught exception fetching replies URI: #{e}" } + # Raise if we can't get the collection for top-level status to trigger retry + raise e if parent_status_uri == @parent_status.uri + + nil + end + end +end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb new file mode 100644 index 00000000000000..229e43d978c493 --- /dev/null +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] + def change + add_column :statuses, :fetched_replies_at, :datetime, null: true + end +end diff --git a/db/schema.rb b/db/schema.rb index 6335f7f31ac210..2a0267bfc33e6b 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -553,12 +553,12 @@ end create_table "ip_blocks", force: :cascade do |t| + t.datetime "created_at", precision: nil, null: false + t.datetime "updated_at", precision: nil, null: false + t.datetime "expires_at", precision: nil t.inet "ip", default: "0.0.0.0", null: false t.integer "severity", default: 0, null: false - t.datetime "expires_at", precision: nil t.text "comment", default: "", null: false - t.datetime "created_at", precision: nil, null: false - t.datetime "updated_at", precision: nil, null: false t.index ["ip"], name: "index_ip_blocks_on_ip", unique: true end @@ -1052,6 +1052,7 @@ t.datetime "edited_at", precision: nil t.boolean "trendable" t.bigint "ordered_media_attachment_ids", array: true + t.datetime "fetched_replies_at" t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)" t.index ["account_id"], name: "index_statuses_on_account_id" t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)" @@ -1380,9 +1381,9 @@ add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true create_view "user_ips", sql_definition: <<-SQL - SELECT user_id, - ip, - max(used_at) AS used_at + SELECT t0.user_id, + t0.ip, + max(t0.used_at) AS used_at FROM ( SELECT users.id AS user_id, users.sign_up_ip AS ip, users.created_at AS used_at @@ -1399,7 +1400,7 @@ login_activities.created_at FROM login_activities WHERE (login_activities.success = true)) t0 - GROUP BY user_id, ip; + GROUP BY t0.user_id, t0.ip; SQL create_view "account_summaries", materialized: true, sql_definition: <<-SQL SELECT accounts.id AS account_id, @@ -1420,9 +1421,9 @@ add_index "account_summaries", ["account_id"], name: "index_account_summaries_on_account_id", unique: true create_view "global_follow_recommendations", materialized: true, sql_definition: <<-SQL - SELECT account_id, - sum(rank) AS rank, - array_agg(reason) AS reason + SELECT t0.account_id, + sum(t0.rank) AS rank, + array_agg(t0.reason) AS reason FROM ( SELECT account_summaries.account_id, ((count(follows.id))::numeric / (1.0 + (count(follows.id))::numeric)) AS rank, 'most_followed'::text AS reason @@ -1446,8 +1447,8 @@ WHERE (follow_recommendation_suppressions.account_id = statuses.account_id))))) GROUP BY account_summaries.account_id HAVING (sum((status_stats.reblogs_count + status_stats.favourites_count)) >= (5)::numeric)) t0 - GROUP BY account_id - ORDER BY (sum(rank)) DESC; + GROUP BY t0.account_id + ORDER BY (sum(t0.rank)) DESC; SQL add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true diff --git a/spec/services/activitypub/fetch_all_replies_service_spec.rb b/spec/services/activitypub/fetch_all_replies_service_spec.rb new file mode 100644 index 00000000000000..5ab1536d519f9b --- /dev/null +++ b/spec/services/activitypub/fetch_all_replies_service_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesService do + subject { described_class.new } + + let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') } + let(:status) { Fabricate(:status, account: actor) } + let(:collection_uri) { 'http://example.com/replies/1' } + + let(:items) do + [ + 'http://example.com/self-reply-1', + 'http://example.com/self-reply-2', + 'http://example.com/self-reply-3', + 'http://other.com/other-reply-1', + 'http://other.com/other-reply-2', + 'http://other.com/other-reply-3', + 'http://example.com/self-reply-4', + 'http://example.com/self-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Collection', + id: collection_uri, + items: items, + }.with_indifferent_access + end + + describe '#call' do + it 'fetches more than the default maximum and from multiple domains' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 + http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + + context 'with a recent status' do + before do + Fabricate(:status, uri: 'http://example.com/self-reply-2', fetched_replies_at: 1.second.ago, local: false) + end + + it 'skips statuses that have been updated recently' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + end + + context 'with an old status' do + before do + Fabricate(:status, uri: 'http://other.com/other-reply-1', fetched_replies_at: 1.year.ago, created_at: 1.year.ago, account: actor) + end + + it 'updates the time that fetched statuses were last fetched' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(Status.find_by(uri: 'http://other.com/other-reply-1').fetched_replies_at).to be >= 1.minute.ago + end + end + end +end diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb new file mode 100644 index 00000000000000..35fadb6caf1497 --- /dev/null +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -0,0 +1,254 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesWorker do + subject { described_class.new } + + let(:top_items) do + [ + 'http://example.com/self-reply-1', + 'http://other.com/other-reply-2', + 'http://example.com/self-reply-3', + ] + end + + let(:top_items_paged) do + [ + 'http://example.com/self-reply-4', + 'http://other.com/other-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:nested_items) do + [ + 'http://example.com/nested-self-reply-1', + 'http://other.com/nested-other-reply-2', + 'http://example.com/nested-self-reply-3', + ] + end + + let(:nested_items_paged) do + [ + 'http://example.com/nested-self-reply-4', + 'http://other.com/nested-other-reply-5', + 'http://example.com/nested-self-reply-6', + ] + end + + let(:all_items) do + top_items + top_items_paged + nested_items + nested_items_paged + end + + let(:top_note_uri) do + 'http://example.com/top-post' + end + + let(:top_collection_uri) do + 'http://example.com/top-post/replies' + end + + # The reply uri that has the nested replies under it + let(:reply_note_uri) do + 'http://other.com/other-reply-2' + end + + # The collection uri of nested replies + let(:reply_collection_uri) do + 'http://other.com/other-reply-2/replies' + end + + let(:replies_top) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_collection_uri, + type: 'Collection', + items: top_items + top_items_paged, + } + end + + let(:replies_nested) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_collection_uri, + type: 'Collection', + items: nested_items + nested_items_paged, + } + end + + # The status resource for the top post + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_top, + attributedTo: 'https://example.com', + } + end + + # The status resource that has the uri to the replies collection + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_nested, + attributedTo: 'https://other.com', + } + end + + let(:empty_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://example.com/empty', + type: 'Note', + content: 'Lorem ipsum', + replies: [], + attributedTo: 'https://example.com', + } + end + + let(:account) { Fabricate(:account, domain: 'example.com') } + let(:status) { Fabricate(:status, account: account, uri: top_note_uri) } + + before do + allow(FetchReplyWorker).to receive(:push_bulk) + all_items.each do |item| + next if [top_note_uri, reply_note_uri].include? item + + stub_request(:get, item).to_return(status: 200, body: Oj.dump(empty_object), headers: { 'Content-Type': 'application/activity+json' }) + end + end + + shared_examples 'fetches all replies' do + before do + stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' }) + end + + it 'fetches statuses recursively' do + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(all_items) + end + + it 'respects the maxium limits set by not recursing after the max is reached' do + stub_const('ActivityPub::FetchAllRepliesWorker::MAX_REPLIES', 5) + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(top_items + top_items_paged) + end + end + + describe 'perform' do + context 'when the payload is a Note with replies as a Collection of inlined replies' do + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a URI to a Collection' do + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: top_collection_uri, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: reply_collection_uri, + attributedTo: 'https://other.com', + } + end + + before do + stub_request(:get, top_collection_uri).to_return(status: 200, body: Oj.dump(replies_top), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_collection_uri).to_return(status: 200, body: Oj.dump(replies_nested), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a paginated collection' do + let(:top_page_2_uri) do + "#{top_collection_uri}/2" + end + + let(:reply_page_2_uri) do + "#{reply_collection_uri}/2" + end + + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: top_collection_uri, + first: { + type: 'CollectionPage', + partOf: top_collection_uri, + items: top_items, + next: top_page_2_uri, + }, + }, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: reply_collection_uri, + first: { + type: 'CollectionPage', + partOf: reply_collection_uri, + items: nested_items, + next: reply_page_2_uri, + }, + }, + attributedTo: 'https://other.com', + } + end + + let(:top_page_two) do + { + type: 'CollectionPage', + id: top_page_2_uri, + partOf: top_collection_uri, + items: top_items_paged, + } + end + + let(:reply_page_two) do + { + type: 'CollectionPage', + id: reply_page_2_uri, + partOf: reply_collection_uri, + items: nested_items_paged, + } + end + + before do + stub_request(:get, top_page_2_uri).to_return(status: 200, body: Oj.dump(top_page_two), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_page_2_uri).to_return(status: 200, body: Oj.dump(reply_page_two), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + end +end