diff --git a/tutorials/custom_reducers.ipynb b/tutorials/custom_reducers.ipynb
new file mode 100644
index 0000000..345f2df
--- /dev/null
+++ b/tutorials/custom_reducers.ipynb
@@ -0,0 +1,401 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-colab"
+ },
+ "source": [
+ "
"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Installing Pathway with Python 3.10+\n",
+ "\n",
+ "In the cell below, we install Pathway into a Python 3.10+ Linux runtime.\n",
+ "\n",
+ "> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.\n",
+ "> \n",
+ "> **The installation and loading time is less than 1 minute**.\n"
+ ],
+ "metadata": {
+ "id": "notebook-instructions"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "%%capture --no-display\n",
+ "!pip install --prefer-binary pathway"
+ ],
+ "metadata": {
+ "id": "pip-installation-pathway",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e8f4ea6d",
+ "metadata": {
+ "lines_to_next_cell": 0
+ },
+ "source": [
+ "# Writing simple custom reducer in Pathway\n",
+ "\n",
+ "Pathway supports natively aggregation using a wide range of [reducers](/developers/api-docs/reducers/), e.g., [`sum`](/developers/api-docs/reducers/#pathway.reducers.sum), [`count`](/developers/api-docs/reducers/#pathway.reducers.count), or [`max`](/developers/api-docs/reducers/#pathway.reducers.max). However, those might not cover all the necessary ways of aggregating values. In this tutorial, you learn how to write reducers implementing custom logic.\n",
+ "\n",
+ "For example, let's implement a custom `stdev` reducer that computes the standard deviation."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "59e7d39b",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:11:07.318543Z",
+ "iopub.status.busy": "2024-03-21T14:11:07.317864Z",
+ "iopub.status.idle": "2024-03-21T14:11:13.981109Z",
+ "shell.execute_reply": "2024-03-21T14:11:13.980722Z"
+ },
+ "lines_to_next_cell": 0
+ },
+ "outputs": [],
+ "source": [
+ "import pathway as pw\n",
+ "\n",
+ "SHOW_DEBUG = False\n",
+ "\n",
+ "\n",
+ "class StdDevAccumulator(pw.BaseCustomAccumulator):\n",
+ " def __init__(self, cnt, sum, sum_sq):\n",
+ " self.cnt = cnt\n",
+ " self.sum = sum\n",
+ " self.sum_sq = sum_sq\n",
+ "\n",
+ " @classmethod\n",
+ " def from_row(cls, row):\n",
+ " [val] = row\n",
+ " if SHOW_DEBUG:\n",
+ " print(\"from_row()\")\n",
+ " return cls(1, val, val**2)\n",
+ "\n",
+ " def update(self, other):\n",
+ " self.cnt += other.cnt\n",
+ " self.sum += other.sum\n",
+ " self.sum_sq += other.sum_sq\n",
+ " if SHOW_DEBUG:\n",
+ " print(\"update()\")\n",
+ "\n",
+ " def compute_result(self) -> float:\n",
+ " mean = self.sum / self.cnt\n",
+ " mean_sq = self.sum_sq / self.cnt\n",
+ " if SHOW_DEBUG:\n",
+ " print(\"compute_result()\")\n",
+ " return mean_sq - mean**2\n",
+ "\n",
+ "\n",
+ "stddev = pw.reducers.udf_reducer(StdDevAccumulator)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "71454959",
+ "metadata": {
+ "lines_to_next_cell": 0
+ },
+ "source": [
+ "Above, the [`pw.BaseCustomAccumulator`](/developers/api-docs/pathway/#pathway.BaseCustomAccumulator) class is used as a base for the `StdDevAccumulator`, which describes the logic of the underlying accumulator. The accumulator class requires a few methods:\n",
+ "* [`from_row`](/developers/api-docs/pathway/#pathway.BaseCustomAccumulator.from_row), which constructs an accumulator from the values of a single row of a table (here, a single value since our reducer applies to a single column),\n",
+ "* [`update`](/developers/api-docs/pathway/#pathway.BaseCustomAccumulator.update), which updates one accumulator by another accumulator,\n",
+ "* [`compute_result`](/developers/api-docs/pathway/#pathway.BaseCustomAccumulator.compute_result), which produces the output based on the accumulator state,\n",
+ "* [`retract`](/developers/api-docs/pathway/#pathway.BaseCustomAccumulator.retract), is an optional method, which processes negative updates,\n",
+ "* [`neutral`](/developers/api-docs/pathway/#pathway.BaseCustomAccumulator.neutral), is an optional method, which returns state corresponding to consuming 0 rows.\n",
+ "\n",
+ "Now, let's see the reducer in action."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "54120407",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:11:13.983535Z",
+ "iopub.status.busy": "2024-03-21T14:11:13.982914Z",
+ "iopub.status.idle": "2024-03-21T14:11:14.167942Z",
+ "shell.execute_reply": "2024-03-21T14:11:14.167441Z"
+ },
+ "lines_to_next_cell": 0
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:11:14]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | avg | stddev\n",
+ "^PWSRT42... | 25.933333333333334 | 3.3355555555555156\n"
+ ]
+ }
+ ],
+ "source": [
+ "temperature_data = pw.debug.table_from_markdown(\n",
+ " \"\"\"\n",
+ "date | temperature\n",
+ "2023-06-06 | 28.0\n",
+ "2023-06-07 | 23.1\n",
+ "2023-06-08 | 24.5\n",
+ "2023-06-09 | 26.0\n",
+ "2023-06-10 | 28.3\n",
+ "2023-06-11 | 25.7\n",
+ "\"\"\"\n",
+ ")\n",
+ "\n",
+ "temperature_statistics = temperature_data.reduce(\n",
+ " avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)\n",
+ ")\n",
+ "\n",
+ "pw.debug.compute_and_print(temperature_statistics)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "8a5f58ff",
+ "metadata": {
+ "lines_to_next_cell": 0
+ },
+ "source": [
+ "However, with this logic, our reducer is not smartly processing negative updates: it starts the computation from scratch whenever a negative update is encountered.\n",
+ "You can see this in action by enabling debug information and processing table where row removal happens. Let's insert several values at time 0 and then remove one already inserted value and add another at time 2."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "9f122693",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:11:14.170018Z",
+ "iopub.status.busy": "2024-03-21T14:11:14.169685Z",
+ "iopub.status.idle": "2024-03-21T14:11:14.210836Z",
+ "shell.execute_reply": "2024-03-21T14:11:14.210435Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:11:14]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "from_row()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "compute_result()\n",
+ "compute_result()\n",
+ " | avg | stddev\n",
+ "^PWSRT42... | 25.96666666666667 | 3.3255555555555247\n"
+ ]
+ }
+ ],
+ "source": [
+ "SHOW_DEBUG = True\n",
+ "temperature_data_with_updates = pw.debug.table_from_markdown(\n",
+ " \"\"\"\n",
+ "date | temperature | __time__ | __diff__\n",
+ "2023-06-06 | 28.0 | 0 | 1\n",
+ "2023-06-07 | 23.1 | 0 | 1\n",
+ "2023-06-08 | 24.5 | 0 | 1\n",
+ "2023-06-09 | 26.0 | 0 | 1\n",
+ "2023-06-10 | 28.3 | 0 | 1\n",
+ "2023-06-11 | 25.7 | 0 | 1\n",
+ "2023-06-11 | 25.7 | 2 | -1\n",
+ "2023-06-11 | 25.9 | 2 | 1\n",
+ "\"\"\"\n",
+ ")\n",
+ "\n",
+ "temperature_statistics_with_updates = temperature_data_with_updates.reduce(\n",
+ " avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)\n",
+ ")\n",
+ "\n",
+ "pw.debug.compute_and_print(temperature_statistics_with_updates)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "9b690ba3",
+ "metadata": {
+ "lines_to_next_cell": 0
+ },
+ "source": [
+ "It can be alleviated by extending our reducer and providing a method for processing negative updates."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "b500cbf2",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:11:14.212452Z",
+ "iopub.status.busy": "2024-03-21T14:11:14.212305Z",
+ "iopub.status.idle": "2024-03-21T14:11:14.214926Z",
+ "shell.execute_reply": "2024-03-21T14:11:14.214574Z"
+ },
+ "lines_to_next_cell": 0
+ },
+ "outputs": [],
+ "source": [
+ "class ImprovedStdDevAccumulator(StdDevAccumulator):\n",
+ " def retract(self, other):\n",
+ " self.cnt -= other.cnt\n",
+ " self.sum -= other.sum\n",
+ " self.sum_sq -= other.sum_sq\n",
+ " if SHOW_DEBUG:\n",
+ " print(\"retract()\")\n",
+ "\n",
+ "\n",
+ "improved_stddev = pw.reducers.udf_reducer(ImprovedStdDevAccumulator)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "adae8a37",
+ "metadata": {
+ "lines_to_next_cell": 0
+ },
+ "source": [
+ "And now you can test the improved reducer in action."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "3c056eae",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:11:14.216305Z",
+ "iopub.status.busy": "2024-03-21T14:11:14.216184Z",
+ "iopub.status.idle": "2024-03-21T14:11:14.250113Z",
+ "shell.execute_reply": "2024-03-21T14:11:14.249652Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:11:14]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "from_row()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "update()\n",
+ "from_row()\n",
+ "retract()\n",
+ "compute_result()\n",
+ "compute_result()\n",
+ " | avg | stddev\n",
+ "^PWSRT42... | 25.96666666666667 | 3.3255555555555247\n"
+ ]
+ }
+ ],
+ "source": [
+ "\n",
+ "temperature_statistics_improved = temperature_data_with_updates.reduce(\n",
+ " avg=pw.reducers.avg(pw.this.temperature),\n",
+ " stddev=improved_stddev(pw.this.temperature),\n",
+ ")\n",
+ "\n",
+ "pw.debug.compute_and_print(temperature_statistics_improved)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ed31b65f",
+ "metadata": {},
+ "source": [
+ "In the example above, 10x calls to `update()` and 12x calls to `from_row()` are replaced with 6x calls to `update()`, 1x call to `retract()` and 8x calls to `from_row()`.\n",
+ "\n",
+ "This comes from the fact that former reducer:\n",
+ "* had to call `from_row()` for each row of the table, wrapping each single value into separate `StdDevAccumulator` object,\n",
+ "* had to call `update()` for each row of the table except the first consumed,\n",
+ "* had to restart from scratch after the update to the table, thus it had to pay the cost twice.\n",
+ "\n",
+ "While the latter reducer aggregated the table at time 0 in the same way as former one, but processed the update differently:\n",
+ "* had to wrap both delete and insert updates with `from_row()` calls\n",
+ "* called once `retract()` and once `update()`."
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.8"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
\ No newline at end of file
diff --git a/tutorials/windows_temporal_behavior.ipynb b/tutorials/windows_temporal_behavior.ipynb
new file mode 100644
index 0000000..1b9d49a
--- /dev/null
+++ b/tutorials/windows_temporal_behavior.ipynb
@@ -0,0 +1,852 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-colab"
+ },
+ "source": [
+ "
"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Installing Pathway with Python 3.10+\n",
+ "\n",
+ "In the cell below, we install Pathway into a Python 3.10+ Linux runtime.\n",
+ "\n",
+ "> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.\n",
+ "> \n",
+ "> **The installation and loading time is less than 1 minute**.\n"
+ ],
+ "metadata": {
+ "id": "notebook-instructions"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "%%capture --no-display\n",
+ "!pip install --prefer-binary pathway"
+ ],
+ "metadata": {
+ "id": "pip-installation-pathway",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4685949d",
+ "metadata": {},
+ "source": [
+ "# Controlling temporal behavior of windows"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3d902862",
+ "metadata": {},
+ "source": [
+ "In this article you will learn how to use windows effectively, by specifying their temporal behaviors."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d511b603",
+ "metadata": {},
+ "source": [
+ "Temporal behaviors in Pathway are crucial for bounding memory consumption, maintaining proper tradeoff between latency and efficiency, or adjusting windows for your needs. To read more about behaviors and the motivation behind them read our [guide](/developers/user-guide/temporal-data/behaviors/). This article goes into detail on how to define the behavior using `common_behavior` and `exactly_once_behavior` and what impact they have on the result of [windows](/developers/user-guide/temporal-data/windows-manual)."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1a8e6d55",
+ "metadata": {},
+ "source": [
+ "The examples in this article use the sliding windows, but you can also use behaviors with tumbling windows."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "43ddb36c",
+ "metadata": {},
+ "source": [
+ "## Event Time vs Processing Time"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0a52ffec",
+ "metadata": {},
+ "source": [
+ "In the context of temporal behavior it is important to distinguish between an event time and a processing time. The event time is when the event happens, e.g. if your data are orders in the online shop, the event time is the time when the order happened. This information has to be present in your data because Pathway doesn't know when the event happened. Thus event time can be any time you assign to your data.\n",
+ "\n",
+ "The only time Pathway is aware of is when the record arrives to the Pathway engine. This time is called processing time. While the processing time of entries in a stream is always nondecreasing (because the time goes forward), due to latency the event time may be out of order. In extreme cases, this can manifest via events with _high_ latency between their event time and processing time, which we shortly call _late data_.\n",
+ "\n",
+ "When grouping data in windows, you usually want to consider the event time, and the temporal behavior is based on it, but the order in which the events are processed impacts the results."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "eaa0a0d2",
+ "metadata": {},
+ "source": [
+ "![Event time vs processing time](https://pathway.com/assets/content/documentation/behavior-guide/event-time-vs-processing-time.svg)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5bba98e6",
+ "metadata": {},
+ "source": [
+ ""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "69b96b12",
+ "metadata": {},
+ "source": [
+ "## Dataset"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5038105a",
+ "metadata": {},
+ "source": [
+ "To try out the temporal behaviors of windows you need an example Pathway Table with both processing time and event time. You can generate it using `pw.debug.table_from_markdown`, which takes a table specification in markdown format. If it has a column named `__time__`, Pathway will use it as a processing time, which allows you to see how the temporality of your data affects the outcome of the computation. The following code creates a table with logs. Other than the `__time__` column, it also has the `event_time`, which says when the event described by the log happened, and the `message` column. In this case, both `__time__` and `event_time` are given as timestamps.\n",
+ "\n",
+ "Remarks:\n",
+ "- while the processing time for the `table_from_markdown` method always needs to be given as a timestamp, the event_time can be any of [various types that are supported by the windowing mechanism](/developers/api-docs/pathway-stdlib-temporal/#pathway.stdlib.temporal.windowby)\n",
+ "- the `table_from_markdown` method needs the processing time to be passed in a column with a special name `__time__`, but the column holding event_time is passed as a parameter to the [`windowby`](/developers/api-docs/pathway-stdlib-temporal/#pathway.stdlib.temporal.windowby) function, and here it is called event_time just to keep the example self-explanatory."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "fd196937",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:52.683259Z",
+ "iopub.status.busy": "2024-03-21T14:12:52.683079Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.344246Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.343675Z"
+ }
+ },
+ "outputs": [],
+ "source": [
+ "import pathway as pw"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "fc869e67",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.347868Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.346782Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.375088Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.374596Z"
+ }
+ },
+ "outputs": [],
+ "source": [
+ "t = pw.debug.table_from_markdown(\n",
+ " \"\"\"\n",
+ " event_time | message | __time__\n",
+ " 360 | Processing_started | 362\n",
+ " 362 | Task_completed_successfully | 362\n",
+ " 366 | Error_occurred_during_processing | 368\n",
+ " 370 | Data_received_from_sensor | 410\n",
+ " 370 | Database_connection_established | 370\n",
+ " 370 | File_saved_successfully | 372\n",
+ " 372 | Processing_completed | 374\n",
+ " 376 | Request_received_from_user | 396\n",
+ " 382 | Task_in_progress | 382\n",
+ " 382 | Warning_Low_memory | 392\n",
+ "\"\"\"\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "598ff941",
+ "metadata": {},
+ "source": [
+ "Consider the following example scenario - you are given the table as defined above, and you need to count the number of logs that fall into 10-second windows, with windows starting every 4 seconds.\n",
+ "\n",
+ "To that end, you can use sliding windows. To keep things simple, start with a piece of code that only groups data into windows, without specifying temporal behaviors. As you can see in the code snippet below, you can do that using `windowby` with `sliding` window of `duration` set to 10 and `hop` set to 4. For the result, keep information about the start and the end of each window and the number of logs that are in those windows."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "f6a606be",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.377601Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.377346Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.499824Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.499368Z"
+ }
+ },
+ "outputs": [],
+ "source": [
+ "result = t.windowby(\n",
+ " t.event_time,\n",
+ " window=pw.temporal.sliding(duration=10, hop=4),\n",
+ ").reduce(\n",
+ " window_start=pw.this._pw_window_start,\n",
+ " window_end=pw.this._pw_window_end,\n",
+ " n_logs=pw.reducers.count(),\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "bfd5fb69",
+ "metadata": {},
+ "source": [
+ "When you use [`pw.debug_compute_and_print`](/developers/api-docs/debug/#pathway.debug.compute_and_print) to print the results, you will only get the final result, after all input rows are processed."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "2385af18",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.502055Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.501836Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.625615Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.625218Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:58]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs\n",
+ "^Y8PEAZV... | 352 | 362 | 1\n",
+ "^YFKEFV6... | 356 | 366 | 2\n",
+ "^DX546QX... | 360 | 370 | 3\n",
+ "^B5NPNV8... | 364 | 374 | 5\n",
+ "^VRT1YTS... | 368 | 378 | 5\n",
+ "^QE3WQ0X... | 372 | 382 | 2\n",
+ "^XJDDRJ0... | 376 | 386 | 3\n",
+ "^R0SM1CF... | 380 | 390 | 2\n"
+ ]
+ }
+ ],
+ "source": [
+ "pw.debug.compute_and_print(result)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0b420fd4",
+ "metadata": {},
+ "source": [
+ "To understand how the result changed when new rows were processed, it is useful to use [`pw.debug.compute_and_print_update_stream`](/developers/api-docs/debug/#pathway.debug.compute_and_print_update_stream) function. It shows you every change made to the Table, with column `__diff__` denoting whether the row was added or removed."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "6207efb9",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.627054Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.626882Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.638464Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.638198Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:58]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs | __time__ | __diff__\n",
+ "^Y8PEAZV... | 352 | 362 | 1 | 362 | 1\n",
+ "^YFKEFV6... | 356 | 366 | 2 | 362 | 1\n",
+ "^DX546QX... | 360 | 370 | 2 | 362 | 1\n",
+ "^DX546QX... | 360 | 370 | 2 | 368 | -1\n",
+ "^DX546QX... | 360 | 370 | 3 | 368 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 1 | 368 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 1 | 370 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 2 | 370 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 1 | 370 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 2 | 372 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 1 | 372 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 372 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 2 | 372 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 374 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 2 | 374 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 374 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 374 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 374 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 1 | 382 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 1 | 382 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 1 | 392 | -1\n",
+ "^R0SM1CF... | 380 | 390 | 1 | 392 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 392 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 2 | 392 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 396 | -1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 396 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 396 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 4 | 396 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 2 | 396 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 3 | 396 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 410 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 4 | 410 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 5 | 410 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 5 | 410 | 1\n"
+ ]
+ }
+ ],
+ "source": [
+ "pw.debug.compute_and_print_update_stream(result)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "755130cd",
+ "metadata": {},
+ "source": [
+ "## What time is it?"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1d8c7fb",
+ "metadata": {},
+ "source": [
+ "The behaviors depend on the \"current time\" of an operator, in this article denoted as _now_. It is defined as the maximum already seen time by an operator in the already processed data (when a new batch of data arrives it is processed using the value of _now_ obtained from previous batches). In the context of windows, this time is taken from the column you use for grouping data in windows - usually event time. For example, `delay` sets a shift in time, and the window will be computed once _now_ is at least `delay` after the beginning of the window."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "bf60d31c",
+ "metadata": {},
+ "source": [
+ "## Common Behavior"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "792dff76",
+ "metadata": {},
+ "source": [
+ "The general way to define temporal behaviors in Pathway is by using `pw.temporal.common_behavior`. It allows you to set `delay`, `cutoff` and `keep_results` parameters. The `delay` and `cutoff` parameters represent time duration and their type should be compatible with the time column passed to `windowby`. This means that if your time column has type `int` or `float` then `delay` and `cutoff` should also have type, respectively, int or float. If instead, the time column has type [`DatetimeUtc`](/developers/api-docs/pathway/#pathway.DateTimeUtc) or [`DatetimeNaive`](/developers/api-docs/pathway/#pathway.DateTimeNaive), then `delay` and `cutoff` should have type [`Duration`](/developers/api-docs/pathway/#pathway.Duration). To understand the motivation of these parameters read our [guide on behaviors](/developers/user-guide/temporal-data/behaviors/)."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "98a908aa",
+ "metadata": {},
+ "source": [
+ "### Delay"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "f37deea1",
+ "metadata": {},
+ "source": [
+ "When you set the `delay` to be non-zero, the engine will wait before first calculating the result of each window. To be more precise, the window will be calculated, when _now_ is at least `window_start + delay`. If `delay` is not provided, it defaults to `None` which disables the delay mechanism."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "429d3375",
+ "metadata": {},
+ "source": [
+ "![Illustration of delay](https://pathway.com/assets/content/tutorials/windows_behavior/window-behavior-delay.svg)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a6f62f91",
+ "metadata": {},
+ "source": [
+ ""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "00302f4c",
+ "metadata": {},
+ "source": [
+ "You can use it to stagger calculations - this allows for more rows to be processed at once, rather than recomputing the result after each row arrives to the engine. If you set the `delay` in the log example to be 4, you will see that the update stream becomes shorter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "baee4ecf",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.640032Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.639788Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.761662Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.761394Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:58]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs | __time__ | __diff__\n",
+ "^Y8PEAZV... | 352 | 362 | 1 | 368 | 1\n",
+ "^YFKEFV6... | 356 | 366 | 2 | 368 | 1\n",
+ "^DX546QX... | 360 | 370 | 3 | 370 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 372 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 374 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 374 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 382 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 392 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 392 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 396 | -1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 396 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 396 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 4 | 396 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 2 | 396 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 3 | 396 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 410 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 4 | 410 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 5 | 410 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 5 | 410 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 2 | 18446744073709551614 | 1\n"
+ ]
+ }
+ ],
+ "source": [
+ "result_delay = t.windowby(\n",
+ " t.event_time,\n",
+ " window=pw.temporal.sliding(duration=10, hop=4),\n",
+ " behavior=pw.temporal.common_behavior(delay=4),\n",
+ ").reduce(\n",
+ " window_start=pw.this._pw_window_start,\n",
+ " window_end=pw.this._pw_window_end,\n",
+ " n_logs=pw.reducers.count(),\n",
+ ")\n",
+ "pw.debug.compute_and_print_update_stream(result_delay)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1c35148c",
+ "metadata": {},
+ "source": [
+ "You can notice in the `__time__` column an unexpected timestamp, that is `18446744073709551614`. That is because of the use of debug mode. As the input ended, the engine triggers the computation of the last window by setting _now_ to be maximum possible time. It won't happen in the streaming mode because the processing there never ends."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "011bd5c6",
+ "metadata": {},
+ "source": [
+ "### Cutoff"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0ce5b9e5",
+ "metadata": {},
+ "source": [
+ "Cutoff determines when the result of the window will no longer be updated, even if there is a change to a data point inside that window. This should not be before the windows closes - in such case you would shorten the window. When the `cutoff` is set, the window is no longer updated when _now_ is later than `window_end + cutoff`. If the `cutoff` is not provided, it defaults to `None` which disables the cutoff mechanism."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ac34cce7",
+ "metadata": {},
+ "source": [
+ "![Illustration of cutoff](https://pathway.com/assets/content/tutorials/windows_behavior/window-behavior-cutoff.svg)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "72a9cef7",
+ "metadata": {},
+ "source": [
+ ""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "85da85d3",
+ "metadata": {},
+ "source": [
+ "Now add `cutoff=4` to the log example. You should see that the row that has processing time `410` no longer impacts the results. When you use `cutoff` omitting such late points means that you get different results than if you processed everything in batch, as the data that comes after the cutoff of a window will not be used in calculations for this window. This, however, is necessary for efficient memory consumption - without setting `cutoff` all data that ever was processed needs to be kept in memory, in case some very late event arrives and a window needs to be recomputed. When you use the cutoff mechanism you inform the engine when it can clear the memory."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "4346e71f",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.763072Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.762905Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.974964Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.974710Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:58]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs | __time__ | __diff__\n",
+ "^Y8PEAZV... | 352 | 362 | 1 | 362 | 1\n",
+ "^YFKEFV6... | 356 | 366 | 2 | 362 | 1\n",
+ "^DX546QX... | 360 | 370 | 2 | 362 | 1\n",
+ "^DX546QX... | 360 | 370 | 2 | 368 | -1\n",
+ "^DX546QX... | 360 | 370 | 3 | 368 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 1 | 368 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 1 | 370 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 2 | 370 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 1 | 370 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 2 | 372 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 1 | 372 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 372 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 2 | 372 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 374 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 2 | 374 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 374 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 374 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 374 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 1 | 382 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 1 | 382 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 1 | 392 | -1\n",
+ "^R0SM1CF... | 380 | 390 | 1 | 392 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 392 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 2 | 392 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 396 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 396 | -1\n",
+ "^QE3WQ0X... | 372 | 382 | 2 | 396 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 3 | 396 | 1\n"
+ ]
+ }
+ ],
+ "source": [
+ "result_cutoff = t.windowby(\n",
+ " t.event_time,\n",
+ " window=pw.temporal.sliding(duration=10, hop=4),\n",
+ " behavior=pw.temporal.common_behavior(cutoff=4),\n",
+ ").reduce(\n",
+ " window_start=pw.this._pw_window_start,\n",
+ " window_end=pw.this._pw_window_end,\n",
+ " n_logs=pw.reducers.count(),\n",
+ ")\n",
+ "pw.debug.compute_and_print_update_stream(result_cutoff)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "94ee8d1a",
+ "metadata": {},
+ "source": [
+ "Note that since the time when cutoff triggers is based only on the window end and `cutoff` value, an event belonging to multiple windows can be late - and ignored in calculations - for one window, but on time for another. In the above example, you can notice that at time `396`. At this time the event with `event_time` equal to `376` arrives to the engine, so it belongs to 3 windows - starting at times `368`, `372` and `376`. But since for the first of these windows, we are past its cutoff when this event arrives, only the other two windows are recalculated."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7770b428",
+ "metadata": {},
+ "source": [
+ "### Keep_results"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "887b4990",
+ "metadata": {},
+ "source": [
+ "The final argument of `common_behavior` - `keep_results` is only relevant if you use the cutoff mechanism. When set to `True`, its default value, the rows corresponding to windows already past cutoff are kept in the output table. You can see that by looking at the final state of the `result_cutoff` Table from the previous Section - it contains a record for each window."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "6644b3c7",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.976603Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.976487Z",
+ "iopub.status.idle": "2024-03-21T14:12:58.989289Z",
+ "shell.execute_reply": "2024-03-21T14:12:58.989006Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:58]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs\n",
+ "^Y8PEAZV... | 352 | 362 | 1\n",
+ "^YFKEFV6... | 356 | 366 | 2\n",
+ "^DX546QX... | 360 | 370 | 3\n",
+ "^B5NPNV8... | 364 | 374 | 4\n",
+ "^VRT1YTS... | 368 | 378 | 3\n",
+ "^QE3WQ0X... | 372 | 382 | 2\n",
+ "^XJDDRJ0... | 376 | 386 | 3\n",
+ "^R0SM1CF... | 380 | 390 | 2\n"
+ ]
+ }
+ ],
+ "source": [
+ "pw.debug.compute_and_print(result_cutoff)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "61cfa2e2",
+ "metadata": {},
+ "source": [
+ "If you set `keep_results=False`, however, once the window is past its cutoff, the record for this window is removed from the result Table, so, in the end, you are left only with the last few windows. The example use case is [log monitoring](/developers/showcases/realtime-log-monitoring), where you want to raise alerts based only on very recent windows."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "02079188",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:58.990629Z",
+ "iopub.status.busy": "2024-03-21T14:12:58.990375Z",
+ "iopub.status.idle": "2024-03-21T14:12:59.095968Z",
+ "shell.execute_reply": "2024-03-21T14:12:59.095579Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:59]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs\n",
+ "^QE3WQ0X... | 372 | 382 | 2\n",
+ "^XJDDRJ0... | 376 | 386 | 3\n",
+ "^R0SM1CF... | 380 | 390 | 2\n"
+ ]
+ }
+ ],
+ "source": [
+ "result_keep_results = t.windowby(\n",
+ " t.event_time,\n",
+ " window=pw.temporal.sliding(duration=10, hop=4, origin=360),\n",
+ " behavior=pw.temporal.common_behavior(cutoff=4, keep_results=False),\n",
+ ").reduce(\n",
+ " window_start=pw.this._pw_window_start,\n",
+ " window_end=pw.this._pw_window_end,\n",
+ " n_logs=pw.reducers.count(),\n",
+ ")\n",
+ "pw.debug.compute_and_print(result_keep_results)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a2214d15",
+ "metadata": {},
+ "source": [
+ "By checking the output of `compute_and_print_update_stream` you can see that each window was calculated at some point, but some of them were later removed."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "id": "b6a011ed",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:59.097241Z",
+ "iopub.status.busy": "2024-03-21T14:12:59.097080Z",
+ "iopub.status.idle": "2024-03-21T14:12:59.110004Z",
+ "shell.execute_reply": "2024-03-21T14:12:59.109692Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:59]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs | __time__ | __diff__\n",
+ "^DX546QX... | 360 | 370 | 2 | 362 | 1\n",
+ "^DX546QX... | 360 | 370 | 2 | 368 | -1\n",
+ "^DX546QX... | 360 | 370 | 3 | 368 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 1 | 368 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 1 | 370 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 2 | 370 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 1 | 370 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 2 | 372 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 1 | 372 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 372 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 2 | 372 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 3 | 374 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 2 | 374 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 374 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 374 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 374 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 1 | 382 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 1 | 382 | 1\n",
+ "^DX546QX... | 360 | 370 | 3 | 392 | -1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 392 | -1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 392 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 1 | 392 | -1\n",
+ "^R0SM1CF... | 380 | 390 | 1 | 392 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 392 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 2 | 392 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 1 | 396 | -1\n",
+ "^XJDDRJ0... | 376 | 386 | 2 | 396 | -1\n",
+ "^QE3WQ0X... | 372 | 382 | 2 | 396 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 3 | 396 | 1\n"
+ ]
+ }
+ ],
+ "source": [
+ "pw.debug.compute_and_print_update_stream(result_keep_results)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7950faec",
+ "metadata": {},
+ "source": [
+ "## Exactly Once Behavior"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "dc84dfdf",
+ "metadata": {},
+ "source": [
+ "For windows that you want to calculate exactly once, Pathway offers an easier way of defining behavior with `pw.temporal.exactly_once_behavior` function. It takes one optional argument, `shift`. Then a window will be calculated at time `_pw_window_end + shift`, and after that all changes to this window will be ignored. It is equivalent to using `pw.temporal.common_behavior` with `delay` set to `duration + shift` (`duration` is an argument to both [sliding](/developers/api-docs/temporal/#pathway.stdlib.temporal.sliding) and [tumbling](/developers/api-docs/temporal/#pathway.stdlib.temporal.tumbling) windows for setting the length of the window) and `cutoff` to `shift`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "b4ced7ee",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2024-03-21T14:12:59.111212Z",
+ "iopub.status.busy": "2024-03-21T14:12:59.111068Z",
+ "iopub.status.idle": "2024-03-21T14:12:59.262692Z",
+ "shell.execute_reply": "2024-03-21T14:12:59.262215Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[2024-03-21T15:12:59]:INFO:Preparing Pathway computation\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " | window_start | window_end | n_logs | __time__ | __diff__\n",
+ "^DX546QX... | 360 | 370 | 3 | 382 | 1\n",
+ "^B5NPNV8... | 364 | 374 | 4 | 392 | 1\n",
+ "^VRT1YTS... | 368 | 378 | 3 | 392 | 1\n",
+ "^QE3WQ0X... | 372 | 382 | 2 | 18446744073709551614 | 1\n",
+ "^XJDDRJ0... | 376 | 386 | 3 | 18446744073709551614 | 1\n",
+ "^R0SM1CF... | 380 | 390 | 2 | 18446744073709551614 | 1\n"
+ ]
+ }
+ ],
+ "source": [
+ "result_exactly_once = t.windowby(\n",
+ " t.event_time,\n",
+ " window=pw.temporal.sliding(duration=10, hop=4, origin=360),\n",
+ " behavior=pw.temporal.exactly_once_behavior(shift=2),\n",
+ ").reduce(\n",
+ " window_start=pw.this._pw_window_start,\n",
+ " window_end=pw.this._pw_window_end,\n",
+ " n_logs=pw.reducers.count(),\n",
+ ")\n",
+ "pw.debug.compute_and_print_update_stream(result_exactly_once)"
+ ]
+ }
+ ],
+ "metadata": {
+ "jupytext": {
+ "cell_metadata_filter": "-all",
+ "main_language": "python",
+ "notebook_metadata_filter": "-all"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.8"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
\ No newline at end of file