Skip to content

Commit

Permalink
Ensure otel test will wait until after task fails to send update
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Oct 25, 2023
1 parent 020f663 commit 6670ee5
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions tests/contrib/test_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.contrib.opentelemetry import workflow as otel_workflow
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from temporalio.worker import UnsandboxedWorkflowRunner, Worker


@dataclass
Expand Down Expand Up @@ -48,6 +48,7 @@ class TracingWorkflowAction:
activity: Optional[TracingWorkflowActionActivity] = None
continue_as_new: Optional[TracingWorkflowActionContinueAsNew] = None
wait_until_signal_count: int = 0
wait_and_do_update: bool = False


@dataclass
Expand All @@ -71,10 +72,14 @@ class TracingWorkflowActionContinueAsNew:
param: TracingWorkflowParam


ready_for_update: asyncio.Semaphore


@workflow.defn
class TracingWorkflow:
def __init__(self) -> None:
self._signal_count = 0
self._did_update = False

@workflow.run
async def run(self, param: TracingWorkflowParam) -> None:
Expand Down Expand Up @@ -126,6 +131,9 @@ async def run(self, param: TracingWorkflowParam) -> None:
await workflow.wait_condition(
lambda: self._signal_count >= action.wait_until_signal_count
)
if action.wait_and_do_update:
ready_for_update.release()
await workflow.wait_condition(lambda: self._did_update)

async def _raise_on_non_replay(self) -> None:
replaying = workflow.unsafe.is_replaying()
Expand All @@ -144,13 +152,11 @@ def signal(self) -> None:
self._signal_count += 1

@workflow.update
def update(self) -> int:
self._signal_count += 1
return self._signal_count
def update(self) -> None:
self._did_update = True

@update.validator
def update_validator(self) -> None:
print("Actually in validator")
pass


Expand All @@ -160,6 +166,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
pytest.skip(
"Java test server: https://github.com/temporalio/sdk-java/issues/1424"
)
global ready_for_update
ready_for_update = asyncio.Semaphore(0)
# Create a tracer that has an in-memory exporter
exporter = InMemorySpanExporter()
provider = TracerProvider()
Expand All @@ -176,6 +184,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
task_queue=task_queue,
workflows=[TracingWorkflow],
activities=[tracing_activity],
# Needed so we can wait to send update at the right time
workflow_runner=UnsandboxedWorkflowRunner(),
):
# Run workflow with various actions
workflow_id = f"workflow_{uuid.uuid4()}"
Expand All @@ -185,15 +195,17 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
actions=[
# First fail on replay
TracingWorkflowAction(fail_on_non_replay=True),
# Wait for a signal & update
TracingWorkflowAction(wait_until_signal_count=2),
# Wait for a signal
TracingWorkflowAction(wait_until_signal_count=1),
# Exec activity that fails task before complete
TracingWorkflowAction(
activity=TracingWorkflowActionActivity(
param=TracingActivityParam(fail_until_attempt=2),
fail_on_non_replay_before_complete=True,
),
),
# Wait for update
TracingWorkflowAction(wait_and_do_update=True),
# Exec child workflow that fails task before complete
TracingWorkflowAction(
child_workflow=TracingWorkflowActionChildWorkflow(
Expand Down Expand Up @@ -240,7 +252,10 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
# Send query, then signal to move it along
assert "some query" == await handle.query(TracingWorkflow.query)
await handle.signal(TracingWorkflow.signal)
await handle.execute_update(TracingWorkflow.update)
# Wait to send the update until after the things that fail tasks are over, as failing a task while the update
# is running can mean we execute it twice, which will mess up our spans.
async with ready_for_update:
await handle.execute_update(TracingWorkflow.update)
await handle.result()

# Dump debug with attributes, but do string assertion test without
Expand All @@ -253,11 +268,11 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
" RunWorkflow:TracingWorkflow",
" MyCustomSpan",
" HandleSignal:signal (links: SignalWorkflow:signal)",
" ValidateUpdate:update (links: StartWorkflowUpdate:update)",
" HandleUpdate:update (links: StartWorkflowUpdate:update)",
" StartActivity:tracing_activity",
" RunActivity:tracing_activity",
" RunActivity:tracing_activity",
" ValidateUpdate:update (links: StartWorkflowUpdate:update)",
" HandleUpdate:update (links: StartWorkflowUpdate:update)",
" StartChildWorkflow:TracingWorkflow",
" RunWorkflow:TracingWorkflow",
" MyCustomSpan",
Expand Down

0 comments on commit 6670ee5

Please sign in to comment.