From 506635afeeab85dd0f16db722c250beb6bd57ea8 Mon Sep 17 00:00:00 2001 From: Peter Marshall Date: Wed, 3 Jul 2024 12:23:07 +0100 Subject: [PATCH] Create 16-native-schemaevolution.ipynb Moving old dimensions notebook with the view on schema evolution into a new branch. --- .../16-native-schemaevolution.ipynb | 827 ++++++++++++++++++ 1 file changed, 827 insertions(+) create mode 100644 notebooks/02-ingestion/16-native-schemaevolution.ipynb diff --git a/notebooks/02-ingestion/16-native-schemaevolution.ipynb b/notebooks/02-ingestion/16-native-schemaevolution.ipynb new file mode 100644 index 00000000..3030cc32 --- /dev/null +++ b/notebooks/02-ingestion/16-native-schemaevolution.ipynb @@ -0,0 +1,827 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0cb3b009-ebde-4d56-9d59-a028d66d8309", + "metadata": {}, + "source": [ + "# Define schemas for incoming stream data\n", + "\n", + "\n", + "Druid tables have an evolving schema that is realized dynamically from the data that you ingest.\n", + "\n", + "In streaming ingestion, the schema of the data is defined in the `dimensionsSpec`, and you can change this over time.\n", + "\n", + "This tutorial demonstrates various ways to work with the [dimensionsSpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec) from an example stream of events, showing schema evolution in action.\n", + "\n", + "In this tutorial, you perform the following tasks:\n", + "\n", + "- Set up a streaming ingestion from Apache Kafka.\n", + "- Start an ingestion that consumes specific dimensions and writes them into a table.\n", + "- Amend the ingestion to consume all but specific dimensions.\n", + "- Run an ingestion using automatic schema discovery." + ] + }, + { + "cell_type": "markdown", + "id": "bbdbf6ad-ca7b-40f5-8ca3-1070f4a3ee42", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "This tutorial works with Druid 29.0.0 or later.\n", + "\n", + "#### Run with Docker\n", + "\n", + "Launch this tutorial and all prerequisites using the `all-services` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see the Learn Druid repository [readme](https://github.com/implydata/learn-druid).\n", + " " + ] + }, + { + "cell_type": "markdown", + "id": "5007a243-b81a-4601-8f57-5b14940abbff", + "metadata": {}, + "source": [ + "## Initialization\n", + "\n", + "The following cells set up the notebook and learning environment ready for use." + ] + }, + { + "cell_type": "markdown", + "id": "48c28c8b-1ae7-4b18-8c76-844375ab29cc", + "metadata": {}, + "source": [ + "### Set up and connect to the learning environment\n", + "\n", + "Run the next cell to set up the Druid Python client's connection to Apache Druid.\n", + "\n", + "If successful, the Druid version number will be shown in the output." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1ec783b-df3f-4168-9be2-cdc6ad3e33c2", + "metadata": {}, + "outputs": [], + "source": [ + "import druidapi\n", + "import os\n", + "import requests\n", + "from datetime import datetime, timedelta\n", + "\n", + "if 'DRUID_HOST' not in os.environ.keys():\n", + " druid_host=f\"http://localhost:8888\"\n", + "else:\n", + " druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n", + " \n", + "print(f\"Opening a connection to {druid_host}.\")\n", + "druid = druidapi.jupyter_client(druid_host)\n", + "\n", + "display = druid.display\n", + "sql_client = druid.sql\n", + "status_client = druid.status\n", + "\n", + "status_client.version" + ] + }, + { + "cell_type": "markdown", + "id": "2efdbee0-62da-4fd3-84e1-f66b8c0150b3", + "metadata": {}, + "source": [ + "Run the next cell to set up the connection to Apache Kafka and Data Generator, and import helper functions for use later in the tutorial." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c075de81-04c9-4b23-8253-20a15d46252e", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import kafka\n", + "from kafka import KafkaConsumer\n", + "\n", + "datagen_host = \"http://datagen:9999\"\n", + "\n", + "datagen_headers = {'Content-Type': 'application/json'}\n", + "\n", + "if (os.environ['KAFKA_HOST'] == None):\n", + " kafka_host=f\"kafka:9092\"\n", + "else:\n", + " kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\"" + ] + }, + { + "cell_type": "markdown", + "id": "2a7b7439-ad21-4808-96b1-8e3c992fa51e", + "metadata": {}, + "source": [ + "### Start a data stream\n", + "\n", + "Run the next cell to use the learn-druid Data Generator to create a stream that we can consume from.\n", + "\n", + "This creates clickstream sample data for an hour and publishes it to a Kafka topic for Druid to consume from." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "897ec019-7145-4005-bb85-ea25eda7bf5a", + "metadata": {}, + "outputs": [], + "source": [ + "datagen_job=\"example-social-dimensions\"\n", + "kafka_topic = datagen_job\n", + "\n", + "target = {\n", + " \"type\":\"kafka\",\n", + " \"endpoint\": kafka_host,\n", + " \"topic\": kafka_topic\n", + "}\n", + "\n", + "datagen_starttime = \"2020-01-01 00:00:00\"\n", + "\n", + "datagen_request = {\n", + " \"name\": datagen_job,\n", + " \"target\": target,\n", + " \"config_file\": \"social/social_posts.json\",\n", + " \"concurrency\":50,\n", + " \"time_type\": datagen_starttime\n", + "}\n", + "\n", + "requests.post(f\"{datagen_host}/start\", json.dumps(datagen_request), headers=datagen_headers)\n", + "requests.get(f\"{datagen_host}/status/{datagen_job}\").json()" + ] + }, + { + "cell_type": "markdown", + "id": "89dde5e8-237e-4531-84c2-8647d92ceaea", + "metadata": {}, + "source": [ + "### Set up ingestion specification basics\n", + "\n", + "An streaming ingestion specification contains three parts:\n", + "\n", + "- [`ioConfig`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#ioconfig): sets the connection to the source data.\n", + "- [`tuningConfig`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig): set specific tuning options for the ingestion.\n", + "- [`dataSchema`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dataschema): controls what happens to the data as it arrives and what the output should be.\n", + "\n", + "Run the following cell to create two objects to represent the `ioConfig` and `tuningConfig` that you will use throughout this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4fb89079-7e2a-404b-be85-d9fc7c97d0f1", + "metadata": {}, + "outputs": [], + "source": [ + "ioConfig = {\n", + " \"type\": \"kafka\",\n", + " \"consumerProperties\": {\n", + " \"bootstrap.servers\": \"kafka:9092\"\n", + " },\n", + " \"topic\": kafka_topic,\n", + " \"inputFormat\": {\n", + " \"type\": \"json\"\n", + " },\n", + " \"useEarliestOffset\": \"false\"\n", + "}\n", + "\n", + "tuningConfig = { \"type\": \"kafka\" }" + ] + }, + { + "cell_type": "markdown", + "id": "ef8fe393-9625-481a-98c1-767537a4a078", + "metadata": {}, + "source": [ + "The `dataSchema` is made of three parts, and is the focus of this notebook.\n", + "\n", + "1. [timestampSpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#timestampspec) and [granularitySpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec) define the primary timestamp (`__time`) and how to use this to partition data.\n", + "2. [dimensionsSpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec) defines what other measures and attributes to add to the table from the incoming dimensions.\n", + "\n", + "In this notebook you will work with all three parts to set up the timestamp and dimensions of an example table." + ] + }, + { + "cell_type": "markdown", + "id": "284bc813-dd75-49aa-bac2-10d1016fff46", + "metadata": {}, + "source": [ + "## Configure the primary timestamp\n", + "\n", + "The primary timestamp is required in every table, and is set in the `timestampSpec`. As the primary partitioning dimension, you must also use the same field to apply initial partitioning to your data - use `granularitySpec` to define how this is done.\n", + "\n", + "Run the next cell to set up a simple consumer and peek at the raw data being emitted from the Data Generator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7128c2a4-588b-4bf6-a0f9-2f002f0ecdbc", + "metadata": {}, + "outputs": [], + "source": [ + "consumer = KafkaConsumer(\n", + " bootstrap_servers=kafka_host\n", + ")\n", + "\n", + "consumer.subscribe(topics=kafka_topic)\n", + "count = 0\n", + "\n", + "for message in consumer:\n", + " count += 1\n", + " if count == 5:\n", + " break\n", + " print (\"%d:%d: v=%s\" % (message.partition,\n", + " message.offset,\n", + " message.value))\n", + "\n", + "consumer.unsubscribe()" + ] + }, + { + "cell_type": "markdown", + "id": "e6693b19-be98-4a8b-b3a5-737823175f1d", + "metadata": {}, + "source": [ + "Each event includes a timestamp in the `time` field in ISO standard format that you will use as the `__time` field.\n", + "\n", + "Run the following cell to set up an object that you will incorporate into your final `dataSchema`. The `column` is set to `time`, which is the column from the generated data you will use as the primary timestamp. The [format](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#timestampspec) set as \"iso\"." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8869ba68-b144-4dd4-9105-6470abe51385", + "metadata": {}, + "outputs": [], + "source": [ + "dataSchema_timestampSpec = {\n", + " \"column\": \"time\",\n", + " \"format\": \"iso\"\n", + " }" + ] + }, + { + "cell_type": "markdown", + "id": "6ecd2912-553f-466c-a3ee-f5b793aee22e", + "metadata": {}, + "source": [ + "Next, you will define how your incoming events will be partitioned. Read more about this important design consideration in the official documentation on [partitioning](https://druid.apache.org/docs/latest/ingestion/partitioning) and [segment size optimization](https://druid.apache.org/docs/latest/operations/segment-optimization).\n", + "\n", + "Run the next cell to create an object that will be incorporated into the `dataSchema` as the `granularitySpec`. Notice that the primary partitioning for your table will be `HOUR`, and that ingestion-time aggregation ([rollup](https://druid.apache.org/docs/latest/ingestion/rollup)) will not be used." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b804095a-1b9f-48dd-9c13-11c1c083e8cd", + "metadata": {}, + "outputs": [], + "source": [ + "dataSchema_granularitySpec = {\n", + " \"rollup\": \"false\",\n", + " \"segmentGranularity\": \"hour\"\n", + " }" + ] + }, + { + "cell_type": "markdown", + "id": "02beae03-25e9-4ce0-a785-ccfb13ec36cb", + "metadata": {}, + "source": [ + "## Configure dimensions\n", + "\n", + "You have now created two objects that set up the primary timestamp, and turn attention to the third part of the `dataSchema`: the `dimensionsSpec`. Here you set what attributes and measures from the source data will be inserted into the table.\n", + "\n", + "You will see examples of:\n", + "\n", + "* Setting the dimensions explicitly.\n", + "* Excluding dimensions specifically.\n", + "* Using automatic schema detection." + ] + }, + { + "cell_type": "markdown", + "id": "b5844e30-a9a5-4051-8325-7b7f9300a400", + "metadata": {}, + "source": [ + "### Use `dimensions` to explicitly set the schema\n", + "\n", + "Use an array of [dimension objects](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimension-objects) to prescribe the specific attributes and measures that will be inserted and their type.\n", + "\n", + "Run the next cell to create a `dimensionsSpec` object that contains a `dimensions` array containing `dimensionObjects` with a name and target data type." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1987c712-8670-4fbd-b3c6-d072efb439a8", + "metadata": {}, + "outputs": [], + "source": [ + "dataSchema_dimensionsSpec = {\n", + " \"dimensions\": [\n", + " \"username\",\n", + " \"post_title\",\n", + " {\n", + " \"name\" : \"views\",\n", + " \"type\" : \"long\" },\n", + " {\n", + " \"name\" : \"upvotes\",\n", + " \"type\" : \"long\" },\n", + " {\n", + " \"name\" : \"comments\",\n", + " \"type\" : \"long\" }\n", + " ]\n", + " }" + ] + }, + { + "cell_type": "markdown", + "id": "65e24a56-3d9e-4589-8d0c-113a5b5ec8e9", + "metadata": {}, + "source": [ + "Run the next cell to create the final `dataSchema` by combining the `timestampSpec`, `granularitySpec`, and `dimensionsSpec`, along with the `dataSource` set to the target name for your table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b3ff426-d93e-42d6-99f3-a2adb4be5f56", + "metadata": {}, + "outputs": [], + "source": [ + "table_name = kafka_topic\n", + "\n", + "dataSchema = {\n", + " \"dataSource\": table_name,\n", + " \"timestampSpec\": dataSchema_timestampSpec,\n", + " \"dimensionsSpec\": dataSchema_dimensionsSpec,\n", + " \"granularitySpec\": dataSchema_granularitySpec\n", + " }\n", + "\n", + "print(json.dumps(dataSchema,indent=3))" + ] + }, + { + "cell_type": "markdown", + "id": "9d222ea7-ccf6-4cf5-b0c6-369a4416323c", + "metadata": {}, + "source": [ + "Run the next cell to incorporate this with the `ioConfig` and `tuningConfig` to create a native [ingestion specification](https://druid.apache.org/docs/latest/ingestion/ingestion-spec)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c69af4ad-00e2-4d9b-b7fc-5725dfe9e040", + "metadata": {}, + "outputs": [], + "source": [ + "ingestionSpec = {\n", + " \"type\": \"kafka\",\n", + " \"spec\": {\n", + " \"ioConfig\": ioConfig,\n", + " \"tuningConfig\": tuningConfig,\n", + " \"dataSchema\": dataSchema\n", + " }\n", + "}" + ] + }, + { + "cell_type": "markdown", + "id": "fe50d1bb-1bc9-4ef1-a1a7-637cc3bb4616", + "metadata": {}, + "source": [ + "Run the next cell to start ingestion raw data from Kafka to Druid." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "92aa5125-1096-462a-a637-cd8f438a2074", + "metadata": {}, + "outputs": [], + "source": [ + "requests.post(f\"{druid_host}/druid/indexer/v1/supervisor\", json.dumps(ingestionSpec), headers=datagen_headers)\n", + "druid.sql.wait_until_ready(table_name, verify_load_status=False)\n", + "display.table(table_name)" + ] + }, + { + "cell_type": "markdown", + "id": "2654936f-8288-4fd5-a4ad-1f94267e43ae", + "metadata": {}, + "source": [ + "Notice that the `dimensionsSpec` has caused Druid to apply a type of BIGINT to `views`, `upvotes`, and `comments`.\n", + "\n", + "Learn more about data types in the dedicated [notebook on data types](../02-ingestion/04-table-datatypes.ipynb).\n", + "\n", + "Before moving on, stop the data generator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6118c6d0-46a0-40e2-ad38-a740205d2236", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"Stop streaming generator: [{requests.post(f'{datagen_host}/stop/{datagen_job}','')}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "3ca324d3-26c5-4033-a102-4f73a80401d1", + "metadata": {}, + "source": [ + "### Use `dimensionExclusions` to explicitly exclude dimensions\n", + "\n", + "Run the next cell to switch the `dimensionsSpec` object to use the \"exclusion\" method for ingesting events.\n", + "\n", + "Notice the `dimensionExclusions` array contains the names of dimensions that will be ignored from the incoming events." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c727467c-e6af-4879-80e1-c2cac368d6e5", + "metadata": {}, + "outputs": [], + "source": [ + "dataSchema_dimensionsSpec = {\n", + " \"dimensionExclusions\": [\n", + " \"username\",\n", + " \"edited\"\n", + " ]\n", + " }" + ] + }, + { + "cell_type": "markdown", + "id": "a23d7546-8e58-473a-b482-37ed8176c88f", + "metadata": {}, + "source": [ + "Incorporate this into an ingestion specification by running the next cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21542771-2f28-4ca3-8bb6-f051f66b5cfc", + "metadata": {}, + "outputs": [], + "source": [ + "dataSchema = {\n", + " \"dataSource\": table_name,\n", + " \"timestampSpec\": dataSchema_timestampSpec,\n", + " \"dimensionsSpec\": dataSchema_dimensionsSpec,\n", + " \"granularitySpec\": dataSchema_granularitySpec\n", + " }\n", + "\n", + "ingestionSpec = {\n", + " \"type\": \"kafka\",\n", + " \"spec\": {\n", + " \"ioConfig\": ioConfig,\n", + " \"tuningConfig\": tuningConfig,\n", + " \"dataSchema\": dataSchema\n", + " }\n", + "}\n", + "\n", + "print(json.dumps(ingestionSpec, indent=5))" + ] + }, + { + "cell_type": "markdown", + "id": "ef19f727-67ed-4090-b83a-5fefb2d7bd89", + "metadata": {}, + "source": [ + "Submit the revised specification for this table to Druid by running the next cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c877e620-0558-4e58-b799-5ddc5e5862af", + "metadata": {}, + "outputs": [], + "source": [ + "requests.post(f\"{druid_host}/druid/indexer/v1/supervisor\", json.dumps(ingestionSpec), headers=datagen_headers)" + ] + }, + { + "cell_type": "markdown", + "id": "55307fc5-b17f-44e8-af2a-a3c6eb095731", + "metadata": {}, + "source": [ + "Restart the data generator by running the next cell.\n", + "\n", + "Notice that the `time_type` is a year later, meaning that the new set of events will have a later timestamp." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10ad306a-c4aa-4c22-b259-be8252b62c2b", + "metadata": {}, + "outputs": [], + "source": [ + "datagen_starttime = \"2021-01-01 00:00:00\"\n", + "\n", + "datagen_request = {\n", + " \"name\": datagen_job,\n", + " \"target\": target,\n", + " \"config_file\": \"social/social_posts.json\",\n", + " \"concurrency\":50,\n", + " \"time_type\": datagen_starttime\n", + "}\n", + "\n", + "requests.post(f\"{datagen_host}/start\", json.dumps(datagen_request), headers=datagen_headers)\n", + "requests.get(f\"{datagen_host}/status/{datagen_job}\").json()" + ] + }, + { + "cell_type": "markdown", + "id": "196248ac-4954-45a6-90a2-5e6d25ef8619", + "metadata": {}, + "source": [ + "The table will now contain two sets of events:\n", + "\n", + "* Events from 2020 that were ingested using an explicit `dimensionsSpec`.\n", + "* Events from 2021 that are currently being ingested using `dimensionExclusions`.\n", + "\n", + "With schema exclusion, `views`, `upvotes`, and `comments` after 2021 will have an internal VARCHAR type.\n", + "\n", + "Run the next cell to show the difference in the data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2e95fae6-7408-47b7-ad7b-11e4dba22d1b", + "metadata": {}, + "outputs": [], + "source": [ + "sql=f'''\n", + "SELECT *\n", + "FROM \"{table_name}\"\n", + "WHERE TIME_IN_INTERVAL(\"__time\",'2020-01-01/PT1H')\n", + "'''\n", + "\n", + "print(\"Using explicit inclusions:\")\n", + "display.sql(sql)\n", + "\n", + "sql=f'''\n", + "SELECT *\n", + "FROM \"{table_name}\"\n", + "WHERE TIME_IN_INTERVAL(\"__time\",'2021-01-01/PT1H')\n", + "'''\n", + "\n", + "print(\"Using explicit exclusions:\")\n", + "display.sql(sql)" + ] + }, + { + "cell_type": "markdown", + "id": "76e66285-c411-442e-9bdf-0b53e3322e57", + "metadata": {}, + "source": [ + "Stop the data generator by running the next cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea728193-bfb0-46e4-9984-25549297efb8", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"Stop streaming generator: [{requests.post(f'{datagen_host}/stop/{datagen_job}','')}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "eb153971-e522-4769-adbe-bd21d95014e4", + "metadata": {}, + "source": [ + "### Use automatic schema discovery\n", + "\n", + "Now set up your `dimensionsSpec` to instruct Druid to discover dimensions and determine a data type automatically by running the next cell by setting `useSchemaDiscovery` to `true`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "33efe405-2758-47cd-a532-1c820a3f4c9e", + "metadata": {}, + "outputs": [], + "source": [ + "dataSchema_dimensionsSpec = {\n", + " \"useSchemaDiscovery\" : \"true\" }\n", + "\n", + "dataSchema = {\n", + " \"dataSource\": table_name,\n", + " \"timestampSpec\": dataSchema_timestampSpec,\n", + " \"dimensionsSpec\": dataSchema_dimensionsSpec,\n", + " \"granularitySpec\": dataSchema_granularitySpec\n", + " }\n", + "\n", + "ingestionSpec = {\n", + " \"type\": \"kafka\",\n", + " \"spec\": {\n", + " \"ioConfig\": ioConfig,\n", + " \"tuningConfig\": tuningConfig,\n", + " \"dataSchema\": dataSchema\n", + " }\n", + "}\n", + "\n", + "print(json.dumps(ingestionSpec, indent=5))" + ] + }, + { + "cell_type": "markdown", + "id": "20470873-bf5b-48ee-8842-22ccdcd084af", + "metadata": {}, + "source": [ + "Submit the revised specification for this table to Druid by running the next cell.\n", + "\n", + "Because automatic schema detection has been used, `views`, `upvotes`, and `comments` will be set as BIGINT." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "84e52d35-6f29-4747-b706-4a16672160da", + "metadata": {}, + "outputs": [], + "source": [ + "requests.post(f\"{druid_host}/druid/indexer/v1/supervisor\", json.dumps(ingestionSpec), headers=datagen_headers)" + ] + }, + { + "cell_type": "markdown", + "id": "01435e75-fc37-4536-9b59-1c179650c3cb", + "metadata": {}, + "source": [ + "Run the next cell to restart the data generator, this time for 2022." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0a185ac2-d0d1-4ffe-aacc-682de7ec6673", + "metadata": {}, + "outputs": [], + "source": [ + "datagen_starttime = \"2022-01-01 00:00:00\"\n", + "\n", + "datagen_request = {\n", + " \"name\": datagen_job,\n", + " \"target\": target,\n", + " \"config_file\": \"social/social_posts.json\",\n", + " \"concurrency\":50,\n", + " \"time_type\": datagen_starttime\n", + "}\n", + "\n", + "requests.post(f\"{datagen_host}/start\", json.dumps(datagen_request), headers=datagen_headers)\n", + "requests.get(f\"{datagen_host}/status/{datagen_job}\").json()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e4daaa9-3bc5-40f8-a4d5-869a1e73f92f", + "metadata": {}, + "outputs": [], + "source": [ + "sql=f'''\n", + "SELECT *\n", + "FROM \"{table_name}\"\n", + "WHERE TIME_IN_INTERVAL(\"__time\",'2020-01-01/PT1H')\n", + "'''\n", + "\n", + "print(\"Using explicit inclusions:\")\n", + "display.sql(sql)\n", + "\n", + "sql=f'''\n", + "SELECT *\n", + "FROM \"{table_name}\"\n", + "WHERE TIME_IN_INTERVAL(\"__time\",'2021-01-01/PT1H')\n", + "'''\n", + "\n", + "print(\"Using explicit exclusions:\")\n", + "display.sql(sql)\n", + "\n", + "sql=f'''\n", + "SELECT *\n", + "FROM \"{table_name}\"\n", + "WHERE TIME_IN_INTERVAL(\"__time\",'2023-01-01/PT1H')\n", + "'''\n", + "\n", + "print(\"Using automatic schema discovery:\")\n", + "display.sql(sql)" + ] + }, + { + "cell_type": "markdown", + "id": "44738d6d-cec2-40ad-aaba-998c758c63f4", + "metadata": {}, + "source": [ + "## Clean up\n", + "\n", + "Run the following cell to stop the data generator and drop the table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8082b545-ba7f-4ede-bb6e-2a6dd62ba0d8", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"Stop streaming generator: [{requests.post(f'{datagen_host}/stop/{datagen_job}','')}]\")\n", + "\n", + "print(f'Pause streaming ingestion: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{table_name}/suspend\",\"\")}]')\n", + "print(f'Shutting down running tasks ...')\n", + "\n", + "tasks = druid.tasks.tasks(state='running', table=table_name)\n", + "while len(tasks)>0:\n", + " for task in tasks:\n", + " print(f\"...stopping task [{task['id']}]\")\n", + " druid.tasks.shut_down_task(task['id'])\n", + " tasks = druid.tasks.tasks(state='running', table=table_name)\n", + "\n", + "print(f'Terminate streaming ingestion: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{table_name}/terminate\",\"\")}]')\n", + "print(f\"Drop datasource: [{druid.datasources.drop(table_name)}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "54b8d5fe-ba85-4b5b-9669-0dd47dfbccd1", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "* The schema of incoming data is defined in the `dimensionsSpec` and is realized in the target table.\n", + "* Dimensions can be explicitly included and typed, explicitly excluded, or automatically detected and typed.\n", + "\n", + "## Learn more\n", + "\n", + "* Review the documentation on the [`dimensionsSpec`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec).\n", + "* Review the documentation on [partitioning](https://druid.apache.org/docs/latest/ingestion/partitioning) and [segment size optimization](https://druid.apache.org/docs/latest/operations/segment-optimization).\n", + "* Run through the dedicated [notebook on data types](../02-ingestion/04-table-datatypes.ipynb).\n", + "* Learn about [changing schemas](https://druid.apache.org/docs/latest/data-management/schema-changes) in Druid.\n", + "* Experiment with combining batch and streaming data in the same table." + ] + } + ], + "metadata": { + "execution": { + "allow_errors": true, + "timeout": 300 + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}