From 636eecee32e6aef4314a91f31d2cea3e8080ac86 Mon Sep 17 00:00:00 2001 From: iekundayo Date: Fri, 1 Nov 2024 10:44:36 -0700 Subject: [PATCH 1/2] implement long poll option for get_workflow_execution_history --- lib/temporal/connection/grpc.rb | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 1f817f51..6c9729fd 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -187,8 +187,18 @@ def get_workflow_execution_history( wait_new_event: wait_for_new_event, history_event_filter_type: HISTORY_EVENT_FILTER[event_type] ) - deadline = timeout ? Time.now + timeout : nil - client.get_workflow_execution_history(request, deadline: deadline) + + loop do + deadline = timeout ? Time.now + timeout : nil + response = client.get_workflow_execution_history(request, deadline: deadline) + + if wait_for_new_event && response.history.events.empty? && !response.next_page_token.empty? + request.next_page_token = response.next_page_token + next + end + + return response + end end def poll_workflow_task_queue(namespace:, task_queue:, binary_checksum:) From 298059422bb2fb3291f476942a22fa4f207243a0 Mon Sep 17 00:00:00 2001 From: iekundayo Date: Fri, 1 Nov 2024 11:08:12 -0700 Subject: [PATCH 2/2] add unit test --- spec/unit/lib/temporal/client_spec.rb | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index a0ba4dc1..5872439f 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -556,6 +556,30 @@ class NamespacedWorkflow < Temporal::Workflow ) end.to raise_error(Temporal::TimeoutError) end + + it 'recurring polling until a close event is received' + completed_event = Fabricate(:workflow_completed_event, result: nil) + response_with_no_closed_event = Fabricate(:workflow_execution_history, events: [], _next_page_token: 'test_token') + response_with_closed_event = Fabricate(:workflow_execution_history, events: [completed_event]) + + expect(connection) + .to receive(:get_workflow_execution_history) + .with( + namespace: 'default-test-namespace', + workflow_id: workflow_id, + run_id: run_id, + wait_for_new_event: true, + event_type: :close, + timeout: 30, + ) + .and_return(response_with_no_closed_event, response_with_closed_event) + + subject.await_workflow_result( + TestStartWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + end end describe '#reset_workflow' do