Skip to content

Commit

Permalink
More client work (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Sep 13, 2024
1 parent ff47b2e commit 83bea60
Show file tree
Hide file tree
Showing 61 changed files with 3,536 additions and 492 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To build shared library for development use:
Note, this is not `compile:dev` because debug-mode in Rust has
[an issue](https://github.com/rust-lang/rust/issues/34283) that causes runtime stack size problems.

To build and test release:
To lint, build, and test release:

bundle exec rake

Expand Down
6 changes: 5 additions & 1 deletion temporalio/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Layout/ClassStructure:

# The default is too small and triggers simply setting lots of values on a proto
Metrics/AbcSize:
Max: 75
Max: 200

# The default is too small
Metrics/BlockLength:
Expand All @@ -44,6 +44,10 @@ Metrics/CyclomaticComplexity:
Metrics/MethodLength:
Max: 100

# The default is too small
Metrics/ModuleLength:
Max: 1000

# The default is too small
Metrics/PerceivedComplexity:
Max: 25
Expand Down
15 changes: 13 additions & 2 deletions temporalio/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ Steep::RakeTask.new

require 'yard'

YARD::Rake::YardocTask.new
module CustomizeYardWarnings # rubocop:disable Style/Documentation
def process
super
rescue YARD::Parser::UndocumentableError
# We ignore if it's an API warning
raise unless statement.last.file.start_with?('lib/temporalio/api/')
end
end

YARD::Handlers::Ruby::ConstantHandler.prepend(CustomizeYardWarnings)

YARD::Rake::YardocTask.new { |t| t.options = ['--fail-on-warning'] }

require 'fileutils'
require 'google/protobuf'
Expand Down Expand Up @@ -321,4 +332,4 @@ Rake::Task[:build].enhance([:copy_parent_files]) do
rm ['LICENSE', 'README.md']
end

task default: ['rubocop', 'compile', 'rbs:install_collection', 'steep', 'test']
task default: ['rubocop', 'yard', 'compile', 'rbs:install_collection', 'steep', 'test']
177 changes: 75 additions & 102 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

require 'google/protobuf/well_known_types'
require 'temporalio/api'
require 'temporalio/client/async_activity_handle'
require 'temporalio/client/connection'
require 'temporalio/client/implementation'
require 'temporalio/client/interceptor'
require 'temporalio/client/workflow_execution'
require 'temporalio/client/workflow_execution_count'
require 'temporalio/client/workflow_handle'
require 'temporalio/client/workflow_query_reject_condition'
require 'temporalio/common_enums'
require 'temporalio/converters'
require 'temporalio/error'
require 'temporalio/internal/proto_utils'
require 'temporalio/retry_policy'
require 'temporalio/runtime'
require 'temporalio/search_attributes'

Expand Down Expand Up @@ -45,10 +50,10 @@ class Client
# TLS options are present, those TLS options will be used.
# @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from payloads.
# @param interceptors [Array<Interceptor>] Set of interceptors that are chained together to allow intercepting of
# client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement
# {Worker::Interceptor} will be used as worker interceptors too so they should not be given separately when
# creating a worker.
# @param default_workflow_query_reject_condition [Api::Enums::V1::QueryRejectCondition, nil] Default rejection
# client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker
# interceptor will be used as worker interceptors too so they should not be given separately when creating a
# worker.
# @param default_workflow_query_reject_condition [WorkflowQueryRejectCondition, nil] Default rejection
# condition for workflow queries if not set during query. See {WorkflowHandle.query} for details on the
# rejection condition.
# @param rpc_metadata [Hash<String, String>] Headers to use for all calls to the server. Keys here can be overriden
Expand Down Expand Up @@ -116,11 +121,10 @@ def self.connect(
# @param interceptors [Array<Interceptor>] Set of interceptors that are chained together to allow intercepting of
# client calls. The earlier interceptors wrap the later ones.
#
# Any interceptors that also implement {Worker::Interceptor} will be used as worker interceptors too so they
# should not be given separately when creating a worker.
# @param default_workflow_query_reject_condition [Api::Enums::V1::QueryRejectCondition, nil] Default rejection
# condition for workflow queries if not set during query. See {WorkflowHandle.query} for details on the
# rejection condition.
# Any interceptors that also implement worker interceptor will be used as worker interceptors too so they should
# not be given separately when creating a worker.
# @param default_workflow_query_reject_condition [WorkflowQueryRejectCondition, nil] Default rejection condition for
# workflow queries if not set during query. See {WorkflowHandle.query} for details on the rejection condition.
#
# @see connect
def initialize(
Expand Down Expand Up @@ -218,7 +222,7 @@ def start_workflow(
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
workflow:,
args:,
id:,
workflow_id: id,
task_queue:,
execution_timeout:,
run_timeout:,
Expand Down Expand Up @@ -264,7 +268,7 @@ def start_workflow(
#
# @return [Object] Successful result of the workflow.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists.
# @raise [Error::WorkflowFailureError] Workflow failed with {Error::WorkflowFailureError.cause} as cause.
# @raise [Error::WorkflowFailureError] Workflow failed with +cause+ as the cause.
# @raise [Error::RPCError] RPC error from call.
def execute_workflow(
workflow,
Expand Down Expand Up @@ -320,103 +324,72 @@ def workflow_handle(
run_id: nil,
first_execution_run_id: nil
)
WorkflowHandle.new(self, workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
end

# @!visibility private
def _impl
@impl
# List workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param rpc_metadata [Hash<String, String>, nil] Headers to include on the RPC call.
# @param rpc_timeout [Float, nil] Number of seconds before timeout.
#
# @return [Enumerator<WorkflowExecution>] Enumerable workflow executions.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def list_workflows(
query = nil,
rpc_metadata: nil,
rpc_timeout: nil
)
@impl.list_workflows(Interceptor::ListWorkflowsInput.new(
query:,
rpc_metadata:,
rpc_timeout:
))
end

# @!visibility private
class Implementation < Interceptor::Outbound
def initialize(client)
super(nil)
@client = client
end

# @!visibility private
def start_workflow(input)
# TODO(cretz): Signal/update with start
req = Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
request_id: SecureRandom.uuid,
namespace: @client.namespace,
workflow_type: Api::Common::V1::WorkflowType.new(name: input.workflow.to_s),
workflow_id: input.id,
task_queue: Api::TaskQueue::V1::TaskQueue.new(name: input.task_queue.to_s),
input: @client.data_converter.to_payloads(input.args),
workflow_execution_timeout: Internal::ProtoUtils.seconds_to_duration(input.execution_timeout),
workflow_run_timeout: Internal::ProtoUtils.seconds_to_duration(input.run_timeout),
workflow_task_timeout: Internal::ProtoUtils.seconds_to_duration(input.task_timeout),
identity: @client.connection.identity,
workflow_id_reuse_policy: input.id_reuse_policy,
workflow_id_conflict_policy: input.id_conflict_policy,
retry_policy: input.retry_policy&.to_proto,
cron_schedule: input.cron_schedule,
memo: Internal::ProtoUtils.memo_to_proto(input.memo, @client.data_converter),
search_attributes: input.search_attributes&.to_proto,
workflow_start_delay: Internal::ProtoUtils.seconds_to_duration(input.start_delay),
request_eager_execution: input.request_eager_start,
header: input.headers
)

# Send request
begin
resp = @client.workflow_service.start_workflow_execution(
req,
rpc_retry: true,
rpc_metadata: input.rpc_metadata,
rpc_timeout: input.rpc_timeout
)
rescue Error::RPCError => e
# Unpack and raise already started if that's the error, otherwise default raise
if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first
details = e.grpc_status.details.first.unpack(Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure)
if details
raise Error::WorkflowAlreadyStartedError.new(
workflow_id: req.workflow_id,
workflow_type: req.workflow_type.name,
run_id: details.run_id
)
end
end
raise
end
# Count workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
# @param rpc_metadata [Hash<String, String>, nil] Headers to include on the RPC call.
# @param rpc_timeout [Float, nil] Number of seconds before timeout.
#
# @return [WorkflowExecutionCount] Count of workflows.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def count_workflows(
query = nil,
rpc_metadata: nil,
rpc_timeout: nil
)
@impl.count_workflows(Interceptor::CountWorkflowsInput.new(
query:,
rpc_metadata:,
rpc_timeout:
))
end

# Return handle
WorkflowHandle.new(
@client,
input.id,
result_run_id: resp.run_id,
first_execution_run_id: resp.run_id
)
# Get an async activity handle.
#
# @param task_token_or_id_reference [String, ActivityIDReference] Task token string or activity ID reference.
# @return [AsyncActivityHandle]
def async_activity_handle(task_token_or_id_reference)
if task_token_or_id_reference.is_a?(ActivityIDReference)
AsyncActivityHandle.new(client: self, task_token: nil, id_reference: task_token_or_id_reference)
elsif task_token_or_id_reference.is_a?(String)
AsyncActivityHandle.new(client: self, task_token: task_token_or_id_reference, id_reference: nil)
else
raise ArgumentError, 'Must be a string task token or an ActivityIDReference'
end
end

# @!visibility private
def fetch_workflow_history_event_page(input)
req = Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new(
namespace: @client.namespace,
execution: Api::Common::V1::WorkflowExecution.new(
workflow_id: input.id,
run_id: input.run_id || ''
),
maximum_page_size: input.page_size || 0,
next_page_token: input.next_page_token,
wait_new_event: input.wait_new_event,
history_event_filter_type: input.event_filter_type,
skip_archival: input.skip_archival
)
resp = @client.workflow_service.get_workflow_execution_history(
req,
rpc_retry: true,
rpc_metadata: input.rpc_metadata,
rpc_timeout: input.rpc_timeout
)
Interceptor::FetchWorkflowHistoryEventPage.new(
events: resp.history&.events || [],
next_page_token: resp.next_page_token.empty? ? nil : resp.next_page_token
)
end
# @!visibility private
def _impl
@impl
end
end
end
32 changes: 32 additions & 0 deletions temporalio/lib/temporalio/client/activity_id_reference.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

require 'temporalio/api'
require 'temporalio/client/interceptor'
require 'temporalio/error'

module Temporalio
class Client
# Reference to an existing activity by its workflow ID, run ID, and activity ID.
class ActivityIDReference
# @return [String] ID for the workflow.
attr_reader :workflow_id

# @return [String, nil] Run ID for the workflow.
attr_reader :run_id

# @return [String] ID for the activity.
attr_reader :activity_id

# Create an activity ID reference.
#
# @param workflow_id [String] ID for the workflow.
# @param run_id [String, nil] Run ID for the workflow.
# @param activity_id [String] ID for the workflow.
def initialize(workflow_id:, run_id:, activity_id:)
@workflow_id = workflow_id
@run_id = run_id
@activity_id = activity_id
end
end
end
end
Loading

0 comments on commit 83bea60

Please sign in to comment.