Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move from Temporalite to Temporal CLI dev server #339

Merged
merged 1 commit into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {

// Testing stuff
m.add_class::<testing::EphemeralServerRef>()?;
m.add_function(wrap_pyfunction!(start_temporalite, m)?)?;
m.add_function(wrap_pyfunction!(start_dev_server, m)?)?;
m.add_function(wrap_pyfunction!(start_test_server, m)?)?;

// Worker stuff
Expand Down Expand Up @@ -55,12 +55,12 @@ fn raise_in_thread<'a>(py: Python<'a>, thread_id: std::os::raw::c_long, exc: &Py
}

#[pyfunction]
fn start_temporalite<'a>(
fn start_dev_server<'a>(
py: Python<'a>,
runtime_ref: &runtime::RuntimeRef,
config: testing::TemporaliteConfig,
config: testing::DevServerConfig,
) -> PyResult<&'a PyAny> {
testing::start_temporalite(py, &runtime_ref, config)
testing::start_dev_server(py, &runtime_ref, config)
}

#[pyfunction]
Expand Down
16 changes: 8 additions & 8 deletions temporalio/bridge/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct EphemeralServerRef {
}

#[derive(FromPyObject)]
pub struct TemporaliteConfig {
pub struct DevServerConfig {
existing_path: Option<String>,
sdk_name: String,
sdk_version: String,
Expand All @@ -38,17 +38,17 @@ pub struct TestServerConfig {
extra_args: Vec<String>,
}

pub fn start_temporalite<'a>(
pub fn start_dev_server<'a>(
py: Python<'a>,
runtime_ref: &runtime::RuntimeRef,
config: TemporaliteConfig,
config: DevServerConfig,
) -> PyResult<&'a PyAny> {
let opts: ephemeral_server::TemporaliteConfig = config.try_into()?;
let opts: ephemeral_server::TemporalDevServerConfig = config.try_into()?;
let runtime = runtime_ref.runtime.clone();
runtime_ref.runtime.future_into_py(py, async move {
Ok(EphemeralServerRef {
server: Some(opts.start_server().await.map_err(|err| {
PyRuntimeError::new_err(format!("Failed starting Temporalite: {}", err))
PyRuntimeError::new_err(format!("Failed starting Temporal dev server: {}", err))
})?),
runtime,
})
Expand Down Expand Up @@ -105,11 +105,11 @@ impl EphemeralServerRef {
}
}

impl TryFrom<TemporaliteConfig> for ephemeral_server::TemporaliteConfig {
impl TryFrom<DevServerConfig> for ephemeral_server::TemporalDevServerConfig {
type Error = PyErr;

fn try_from(conf: TemporaliteConfig) -> PyResult<Self> {
ephemeral_server::TemporaliteConfigBuilder::default()
fn try_from(conf: DevServerConfig) -> PyResult<Self> {
ephemeral_server::TemporalDevServerConfigBuilder::default()
.exe(if let Some(existing_path) = conf.existing_path {
ephemeral_server::EphemeralExe::ExistingPath(existing_path.to_owned())
} else {
Expand Down
12 changes: 6 additions & 6 deletions temporalio/bridge/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@


@dataclass
class TemporaliteConfig:
"""Python representation of the Rust struct for configuring Temporalite."""
class DevServerConfig:
"""Python representation of the Rust struct for configuring dev server."""

existing_path: Optional[str]
sdk_name: str
Expand Down Expand Up @@ -48,12 +48,12 @@ class EphemeralServer:
"""Python representation of a Rust ephemeral server."""

@staticmethod
async def start_temporalite(
runtime: temporalio.bridge.runtime.Runtime, config: TemporaliteConfig
async def start_dev_server(
runtime: temporalio.bridge.runtime.Runtime, config: DevServerConfig
) -> EphemeralServer:
"""Start a Temporalite instance."""
"""Start a dev server instance."""
return EphemeralServer(
await temporalio.bridge.temporal_sdk_bridge.start_temporalite(
await temporalio.bridge.temporal_sdk_bridge.start_dev_server(
runtime._ref, config
)
)
Expand Down
8 changes: 8 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2411,6 +2411,14 @@ class ScheduleRange:
Unset or 0 defaults as 1.
"""

def __post_init__(self):
"""Set field defaults."""
# Class is frozen, so we must setattr bypassing dataclass setattr
if self.end < self.start:
object.__setattr__(self, "end", self.start)
if self.step == 0:
object.__setattr__(self, "step", 1)

@staticmethod
def _from_protos(
ranges: Sequence[temporalio.api.schedule.v1.Range],
Expand Down
87 changes: 43 additions & 44 deletions temporalio/testing/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ async def start_local(
download_dest_dir: Optional[str] = None,
ui: bool = False,
runtime: Optional[temporalio.runtime.Runtime] = None,
temporalite_existing_path: Optional[str] = None,
temporalite_database_filename: Optional[str] = None,
temporalite_log_format: str = "pretty",
temporalite_log_level: Optional[str] = "warn",
temporalite_download_version: str = "default",
temporalite_extra_args: Sequence[str] = [],
dev_server_existing_path: Optional[str] = None,
dev_server_database_filename: Optional[str] = None,
dev_server_log_format: str = "pretty",
dev_server_log_level: Optional[str] = "warn",
dev_server_download_version: str = "default",
dev_server_extra_args: Sequence[str] = [],
) -> WorkflowEnvironment:
"""Start a full Temporal server locally, downloading if necessary.

Expand All @@ -106,16 +106,15 @@ async def start_local(
environment. :py:meth:`sleep` will sleep the actual amount of time and
:py:meth:`get_current_time` will return the current time.

Internally, this uses
`Temporalite <https://github.com/temporalio/temporalite>`_. Which is a
self-contained binary for Temporal using Sqlite persistence. This will
download Temporalite to a temporary directory by default if it has not
already been downloaded before and ``temporalite_existing_path`` is not
set.
Internally, this uses the Temporal CLI dev server from
https://github.com/temporalio/cli. This is a self-contained binary for
Temporal using Sqlite persistence. This call will download the CLI to a
temporary directory by default if it has not already been downloaded
before and ``dev_server_existing_path`` is not set.

In the future, the Temporalite implementation may be changed to another
implementation. Therefore, all ``temporalite_`` prefixed parameters are
Temporalite specific and may not apply to newer versions.
In the future, the dev server implementation may be changed to another
implementation. Therefore, all ``dev_server_`` prefixed parameters are
dev-server specific and may not apply to newer versions.

Args:
namespace: Namespace name to use for this environment.
Expand All @@ -137,55 +136,55 @@ async def start_local(
port: Port number to bind to, or an OS-provided port by default.
download_dest_dir: Directory to download binary to if a download is
needed. If unset, this is the system's temporary directory.
ui: If ``True``, will start a UI in Temporalite.
ui: If ``True``, will start a UI in the dev server.
runtime: Specific runtime to use or default if unset.
temporalite_existing_path: Existing path to the Temporalite binary.
dev_server_existing_path: Existing path to the CLI binary.
If present, no download will be attempted to fetch the binary.
temporalite_database_filename: Path to the Sqlite database to use
for Temporalite. Unset default means only in-memory Sqlite will
be used.
temporalite_log_format: Log format for Temporalite.
temporalite_log_level: Log level to use for Temporalite. Default is
``warn``, but if set to ``None`` this will translate the Python
logger's level to a Temporalite level.
temporalite_download_version: Specific Temporalite version to
download. Defaults to ``default`` which downloads the version
known to work best with this SDK.
temporalite_extra_args: Extra arguments for the Temporalite binary.
dev_server_database_filename: Path to the Sqlite database to use
for the dev server. Unset default means only in-memory Sqlite
will be used.
dev_server_log_format: Log format for the dev server.
dev_server_log_level: Log level to use for the dev server. Default
is ``warn``, but if set to ``None`` this will translate the
Python logger's level to a dev server log level.
dev_server_download_version: Specific CLI version to download.
Defaults to ``default`` which downloads the version known to
work best with this SDK.
dev_server_extra_args: Extra arguments for the CLI binary.

Returns:
The started Temporalite workflow environment.
The started CLI dev server workflow environment.
"""
# Use the logger's configured level if none given
if not temporalite_log_level:
if not dev_server_log_level:
if logger.isEnabledFor(logging.DEBUG):
temporalite_log_level = "debug"
dev_server_log_level = "debug"
elif logger.isEnabledFor(logging.INFO):
temporalite_log_level = "info"
dev_server_log_level = "info"
elif logger.isEnabledFor(logging.WARNING):
temporalite_log_level = "warn"
dev_server_log_level = "warn"
elif logger.isEnabledFor(logging.ERROR):
temporalite_log_level = "error"
dev_server_log_level = "error"
else:
temporalite_log_level = "fatal"
# Start Temporalite
dev_server_log_level = "fatal"
# Start CLI dev server
runtime = runtime or temporalio.runtime.Runtime.default()
server = await temporalio.bridge.testing.EphemeralServer.start_temporalite(
server = await temporalio.bridge.testing.EphemeralServer.start_dev_server(
runtime._core_runtime,
temporalio.bridge.testing.TemporaliteConfig(
existing_path=temporalite_existing_path,
temporalio.bridge.testing.DevServerConfig(
existing_path=dev_server_existing_path,
sdk_name="sdk-python",
sdk_version=temporalio.service.__version__,
download_version=temporalite_download_version,
download_version=dev_server_download_version,
download_dest_dir=download_dest_dir,
namespace=namespace,
ip=ip,
port=port,
database_filename=temporalite_database_filename,
database_filename=dev_server_database_filename,
ui=ui,
log_format=temporalite_log_format,
log_level=temporalite_log_level,
extra_args=temporalite_extra_args,
log_format=dev_server_log_format,
log_level=dev_server_log_level,
extra_args=dev_server_extra_args,
),
)
# If we can't connect to the server, we should shut it down
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def env_type(request: pytest.FixtureRequest) -> str:
async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
if env_type == "local":
env = await WorkflowEnvironment.start_local(
temporalite_extra_args=[
dev_server_extra_args=[
"--dynamic-config-value",
"system.forceSearchAttributesCacheRefreshOnRead=true",
]
Expand Down
49 changes: 29 additions & 20 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Optional,
Sequence,
Tuple,
Type,
cast,
)

Expand All @@ -32,11 +31,11 @@
from temporalio.api.common.v1 import Payload, Payloads, WorkflowExecution
from temporalio.api.enums.v1 import EventType, IndexedValueType
from temporalio.api.failure.v1 import Failure
from temporalio.api.operatorservice.v1 import AddSearchAttributesRequest
from temporalio.api.workflowservice.v1 import (
GetSearchAttributesRequest,
GetWorkflowExecutionHistoryRequest,
from temporalio.api.operatorservice.v1 import (
AddSearchAttributesRequest,
ListSearchAttributesRequest,
)
from temporalio.api.workflowservice.v1 import GetWorkflowExecutionHistoryRequest
from temporalio.bridge.proto.workflow_activation import WorkflowActivation
from temporalio.bridge.proto.workflow_completion import WorkflowActivationCompletion
from temporalio.client import (
Expand Down Expand Up @@ -1356,9 +1355,11 @@ def do_search_attribute_update(self) -> None:
empty_float_list: List[float] = []
workflow.upsert_search_attributes(
{
f"{sa_prefix}text": ["text3"],
# We intentionally leave keyword off to confirm it still comes back
f"{sa_prefix}int": [123, 456],
f"{sa_prefix}text": ["text2"],
# We intentionally leave keyword off to confirm it still comes
# back but replace keyword list
f"{sa_prefix}keyword_list": ["keywordlist3", "keywordlist4"],
f"{sa_prefix}int": [456],
# Empty list to confirm removed
f"{sa_prefix}double": empty_float_list,
f"{sa_prefix}bool": [False],
Expand All @@ -1374,18 +1375,20 @@ async def test_workflow_search_attributes(client: Client, env_type: str):
pytest.skip("Only testing search attributes on local which disables cache")

async def search_attributes_present() -> bool:
resp = await client.workflow_service.get_search_attributes(
GetSearchAttributesRequest()
resp = await client.operator_service.list_search_attributes(
ListSearchAttributesRequest(namespace=client.namespace)
)
return any(k for k in resp.keys.keys() if k.startswith(sa_prefix))
return any(k for k in resp.custom_attributes.keys() if k.startswith(sa_prefix))

# Add search attributes if not already present
if not await search_attributes_present():
await client.operator_service.add_search_attributes(
AddSearchAttributesRequest(
namespace=client.namespace,
search_attributes={
f"{sa_prefix}text": IndexedValueType.INDEXED_VALUE_TYPE_TEXT,
f"{sa_prefix}keyword": IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD,
f"{sa_prefix}keyword_list": IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD_LIST,
f"{sa_prefix}int": IndexedValueType.INDEXED_VALUE_TYPE_INT,
f"{sa_prefix}double": IndexedValueType.INDEXED_VALUE_TYPE_DOUBLE,
f"{sa_prefix}bool": IndexedValueType.INDEXED_VALUE_TYPE_BOOL,
Expand All @@ -1402,29 +1405,31 @@ async def search_attributes_present() -> bool:
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
search_attributes={
f"{sa_prefix}text": ["text1", "text2", "text0"],
f"{sa_prefix}text": ["text1"],
f"{sa_prefix}keyword": ["keyword1"],
f"{sa_prefix}keyword_list": ["keywordlist1", "keywordlist2"],
f"{sa_prefix}int": [123],
f"{sa_prefix}double": [456.78],
f"{sa_prefix}bool": [True],
f"{sa_prefix}datetime": [
# With UTC
datetime(2001, 2, 3, 4, 5, 6, tzinfo=timezone.utc),
# With other offset
datetime(2002, 3, 4, 5, 6, 7, tzinfo=timezone(timedelta(hours=8))),
datetime(2001, 2, 3, 4, 5, 6, tzinfo=timezone.utc)
],
},
)
# Make sure it started with the right attributes
expected = {
f"{sa_prefix}text": {"type": "str", "values": ["text1", "text2", "text0"]},
f"{sa_prefix}text": {"type": "str", "values": ["text1"]},
f"{sa_prefix}keyword": {"type": "str", "values": ["keyword1"]},
f"{sa_prefix}keyword_list": {
"type": "str",
"values": ["keywordlist1", "keywordlist2"],
},
f"{sa_prefix}int": {"type": "int", "values": [123]},
f"{sa_prefix}double": {"type": "float", "values": [456.78]},
f"{sa_prefix}bool": {"type": "bool", "values": [True]},
f"{sa_prefix}datetime": {
"type": "datetime",
"values": ["2001-02-03 04:05:06+00:00", "2002-03-04 05:06:07+08:00"],
"values": ["2001-02-03 04:05:06+00:00"],
},
}
assert expected == await handle.query(
Expand All @@ -1434,9 +1439,13 @@ async def search_attributes_present() -> bool:
# Do an attribute update and check query
await handle.signal(SearchAttributeWorkflow.do_search_attribute_update)
expected = {
f"{sa_prefix}text": {"type": "str", "values": ["text3"]},
f"{sa_prefix}text": {"type": "str", "values": ["text2"]},
f"{sa_prefix}keyword": {"type": "str", "values": ["keyword1"]},
f"{sa_prefix}int": {"type": "int", "values": [123, 456]},
f"{sa_prefix}keyword_list": {
"type": "str",
"values": ["keywordlist3", "keywordlist4"],
},
f"{sa_prefix}int": {"type": "int", "values": [456]},
f"{sa_prefix}double": {"type": "<unknown>", "values": []},
f"{sa_prefix}bool": {"type": "bool", "values": [False]},
f"{sa_prefix}datetime": {
Expand Down
Loading