Skip to content

AutoMQ x StarRocks: Powering Real Time Analytics

lyx edited this page Jan 17, 2025 · 1 revision

StarRocks is a cutting-edge analytical data warehouse that leverages technologies such as vectorization, MPP architecture, CBO, intelligent materialized views, and a columnar storage engine with capabilities for real-time updates. It supports multidimensional, real-time, and high-concurrency data analysis.

This article explores how to use StarRocks Routine Load for importing data from AutoMQ into StarRocks. To dive deeper into the essentials of Routine Load, refer to the Routine Load Fundamentals documentation.

Environment Preparation

Set up StarRocks and prepare the test data.

Ensure a StarRocks cluster is readily available. For demonstration purposes, we refer to Using Docker to Deploy StarRocks to install a demo cluster on a Linux machine.

Create a test table for the database and primary key models:

create database automq_db;
create table users (
    id bigint NOT NULL,
    name string NOT NULL,
    timestamp string NULL,
    status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
    "replication_num" = "1",
    "enable_persistent_index" = "true"
);

Prepare AutoMQ and test data

Refer to Stand-alone Deployment to deploy AutoMQ, ensuring network connectivity between AutoMQ and StarRocks.

Quickly create a topic named example_topic in AutoMQ, and write a test JSON data into it by following these steps.

Create Topic

Use the Apache Kafka command line tool to create a topic, ensuring that you have access to a Kafka environment and that the Kafka service is operational. Here is an example command to create a topic:

./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092  --partitions 1 --replication-factor 1

When executing commands, replace the topic and bootstrap-server with the actual Kafka server addresses you are using.

After creating the topic, you can use the following command to check if the topic has been successfully created.

./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

Generate test data

Produce JSON formatted test data, which should correspond to the previously mentioned table.

{
  "id": 1,
  "name": "测试用户",
  "timestamp": "2023-11-10T12:00:00",
  "status": "active"
}

Write test data

Employ Kafka's command-line tools or programmatic approaches to write the test data into a topic named 'example_topic'. Here's how you can do it using the command-line tool:

echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

To review the data that has just been written to the topic, execute the following command:

sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

When executing commands, replace the topic and bootstrap-server with the actual Kafka server addresses you are using.

Creating a Routine Load import job

In the StarRocks command line, create a Routine Load job to continuously import data from the AutoMQ Kafka topic.

CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
    "desired_concurrent_number" = "5",
    "format" = "json",
    "jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
 )
FROM KAFKA
(
    "kafka_broker_list" = "10.0.96.4:9092",
    "kafka_topic" = "example_topic",
    "kafka_partitions" = "0",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

When executing commands, replace kafka_broker_list with the actual Kafka server addresses you are using.

Parameter Description

Data Format

Specify the data format as JSON by setting "format" = "json" in the PROPERTIES clause.

Data Extraction and Transformation

To define the mapping and transformation relationship between the source data and the target table columns, you can use the COLUMNS and jsonpaths parameters. The COLUMNS parameter maps column names to the target table's column names, with the order of columns mirroring that of the source data. The jsonpaths parameter extracts the necessary field data from JSON data, akin to creating new CSV data. The COLUMNS parameter then provisionally names columns based on the sequence of fields specified in jsonpaths. For additional information on data transformation, please refer to the Data Transformation During Import Documentation.

If each JSON object per line has keys that match the names and number of the columns in the target table (order is not important), configuring COLUMNS is not necessary.

Validate Data Import

Initially, verify the status of the Routine Load import job to confirm that the task is active.

show routine load\G;

Next, examine the related tables in the StarRocks database to verify that the data has been successfully imported.

StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id   | name         | timestamp           | status |
+------+--------------+---------------------+--------+
|    1 | testuser     | 2023-11-10T12:00:00 | active |
|    2 | testuser     | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)

AutoMQ Wiki Key Pages

What is automq

Getting started

Architecture

Deployment

Migration

Observability

Integrations

Releases

Benchmarks

Reference

Articles

Clone this wiki locally