-
Notifications
You must be signed in to change notification settings - Fork 235
Automatic Continuous Testing for Kafka Like Streaming Platform
AutoMQ[1], as a streaming system, is widely used in critical customer operations that demand high reliability. Consequently, a simulated, long-term testing environment that replicates real-world production scenarios is essential to ensure the viability of SLAs. This level of assurance is critical for the confidence in releasing new versions and for client adoption. With this objective, we created an automated, continuous testing platform for streaming systems, named Marathon. Before rolling out the Marathon framework, we established three key design principles:
-
Scalable: The platform must accommodate the growth of test cases and deployment modes as the system under test evolves
-
Observable: Being a testing platform, encountering bugs is expected. Thus, robust debugging tools are essential for pinpointing and resolving root causes
-
Cost-effective: Given the fluctuating traffic patterns in test scenarios, resource consumption should dynamically adjust according to traffic changes
These three principles guided subsequent technology choices and architectural decisions.
Let’s begin with an overview of the architecture diagram
The Marathon project's Controller, Worker, and the AutoMQ Enterprise Edition control plane are all integrated within Kubernetes (K8S):
-
The Controller interacts with the AutoMQ Enterprise Edition control plane within the same VPC to oversee the creation, modification, and deletion of Kafka clusters, while also coordinating test tasks and managing the quantity and configuration of Workers.
-
Worker: Operates Kafka clients to generate the necessary workload for tasks and is also tasked with reporting observability data and performing client-side SLA assessments
-
AutoMQ Enterprise Edition control plane: Delivers a comprehensive set of productized features for the data plane, including cluster lifecycle management, observability, security auditing, and cluster reassignment. Marathon predominantly leverages its OpenAPI related to cluster lifecycle management to create, modify, and destroy clusters, facilitating the execution of the entire testing process
The architecture of the Controller and Worker is crafted as a distributed system: The Controller functions akin to a K8S Operator, dynamically adjusting the number and setup of Workers via a tuning loop to align with task demands; Workers are fully stateless systems that inform the Controller about various events to manage corresponding actions. This setup provides the architecture with remarkable flexibility, supporting the scalability demands of tasks. Moreover, the lightweight, adaptable Workers can dynamically scale and even operate on Spot instances[2], considerably lowering operational expenses and enabling the feasibility of ultra-large-scale elastic tasks
The Controller is designed for resource management and task orchestration, initiating several resource managers at the outset:
-
Service Discovery: Monitors the operational status of Workers
-
Event Bus: Acts as the communication conduit with Workers
-
Alert Service: Alerts administrators to events requiring immediate attention
-
Kafka Cluster Manager: Oversees the status of Kafka clusters; tracks Kafka release updates and manages upgrades
-
Signal Processor: Detects SIG_TERM to begin the termination process, reclaiming any resources created
The Controller accommodates various types of Kafka clusters:
-
Existing Kafka clusters: Rapidly confirms the functionality of designated clusters
-
Managed Kafka Clusters: Managed by a Controller that oversees the entire lifecycle of the cluster, these Kafka clusters leverage the control plane capabilities of AutoMQ for creation and destruction
The Controller uses a mechanism akin to a K8S Operator, dynamically adjusting the number and configuration of Workers based on task requirements during a tuning cycle. Each task corresponds to a test scenario, where tasks are programmed to send and receive messages from Kafka, constructing various traffic models for black-box testing
Each task is divided into four stages, sequentially executed within the same thread:
-
Resource creation
-
Warm-up
-
Running task load
-
Resource recovery
The Marathon framework provides a comprehensive set of utility classes designed to streamline the process of task creation. These include functionalities for generating Kafka topics, managing consumer backlogs, adjusting worker traffic, monitoring specific events, and introducing faults into Kafka clusters. Paired with Workers, these tools facilitate the simulation of traffic across any scale and enable testing in unique scenarios, such as large-scale cold reads or the deliberate shutdown of a Kafka node to assess data integrity.
Coding tasks offer the flexibility to craft specific scenarios with the sole restriction of avoiding non-interruptible blocking operations. If a Worker's Spot instance is reclaimed, the Controller intervenes to interrupt the task thread, reclaim resources, and retry the task as needed.
Conducting stress tests on a Kafka cluster can demand bandwidths exceeding tens of GB/s, clearly surpassing the capabilities of a single machine. Thus, designing a distributed system becomes imperative. The initial step involves determining how to locate newly established Workers and communicate with them. Our decision to manage the system with Kubernetes (K8s) naturally leads us to employ K8s mechanisms for service discovery.
We conceptualize a collection of identically configured Workers as a Worker Deployment, aligning with the Deployment model in K8s. Each Worker functions as a Pod within this Deployment. Creating Workers through the Controller is comparable to deploying a Deployment to the API Server and awaiting the activation of all Pods, as illustrated in Steps 1 and 2. K8s nodes scale appropriately, provisioning the necessary Spot instance virtual machines.
Upon initialization, each Worker generates a Configmap that catalogs the events of interest, initially concentrating on initialization events (Step 3). The Controller monitors for newly created Configmaps using the K8s Watch API (Step 4), subsequently dispatching initialization events containing configurations to these Workers (Step 5).
This completes the service discovery and initialization process for Workers. Workers then update their Configmaps to subscribe to additional events of interest. This mechanism of service discovery empowers the Controller with the dynamic ability to create Workers, setting the groundwork for the event bus outlined in the subsequent section.
Leveraging the service discovery mechanism discussed previously, the Controller now identifies the service addresses of each Worker (combining Pod IP and port) and the events these Workers are interested in (such as subscribing to Configmap changes), allowing the Controller to push events directly to specific Workers.
Numerous RPC frameworks are available, and Marathon has opted for Vert.x. It supports the traditional request-reply communication model as well as the multi-receiver publish-subscribe model, which proves invaluable in scenarios where multiple nodes must acknowledge an event (illustrated in the figure for the Adjust throughput command).
As deduced from the preceding sections, Workers can be dynamically generated as needed by tasks, and commands to execute tasks on Workers can also be dispatched through the event bus (as illustrated in the figure for the Initialize new worker command). Essentially, Workers are stateless and can be rapidly created or destroyed, making the utilization of Spot Instances viable (the Controller, utilizing minimal resources, can operate on a smaller-scale Reserved Instance).
The Controller employs Kubernetes' Watch API to monitor the status of Pods, pausing and restarting the current task upon detecting an unexpected termination of a Pod. This enables prompt detection and mitigation of task impacts during the reclamation of Spot Instances. Spot Instances, derived from the excess capacity of cloud providers, offer significant cost savings compared to Reserved Instances. By leveraging Spot Instances, Marathon can drastically cut the costs of executing tasks with lower stability demands over prolonged periods.
Marathon test scenarios are outlined in code by inheriting from an Abstract class, defining the test case configuration, and implementing its lifecycle methods. Here are some of the existing test scenarios:
Test case configurations utilize generics, for instance, taking CatchUpReadTask as an example, the class is structured as
*public class *CatchUpReadTask *extends *AbstractTask<CatchUpReadTaskConfig>
The related configuration class, CatchUpReadTaskConfig, outlines the necessary parameters for executing this task, which users can dynamically set
Each task scenario is characterized through the implementation of the following lifecycle methods to simulate a specific traffic pattern:
-
prepare: Establish the necessary resources for the task
-
warmup: Ready the Worker and the cluster for testing
-
workload: Generate the task workload
-
cleanup: Remove the resources established for the task
Taking CatchUpReadTask as an example:
The Workload stage is the key differentiator among various task scenarios, where the CatchUpReadTask needs to build an appropriate backlog volume and then ensure it can be consumed within 5 minutes. For ChaosTask, the approach shifts to terminating a node and verifying that its partitions can be reassigned to other nodes within 1 minute. To cater to the diverse requirements of these tasks, the Marathon framework offers a toolkit for crafting test scenarios, as illustrated in the figure above:
-
KafkaUtils: Create/Delete Topic (a resource type within Kafka clusters)
-
WorkerDeployment: Create Worker
-
ThroughputChecker: Continuously monitor whether the throughput meets the expected standards
-
AwaitUtils: Confirm that the piled-up messages can be consumed within five minutes
With a variety of implementations of AbstractTask, a wide range of testing scenarios is possible. Orchestrating different task stages and even distinct tasks is essential for the Controller to execute the aforementioned scenarios.
Exploring additional methods in AbstractTask reveals its inheritance from the Runnable interface. By overriding the run method, it sequentially executes the lifecycle stages: prepare, warmup, workload, and cleanup, enabling the Task to be assigned to a thread for execution.
Upon initialization, the Controller sets up a task loop, constructs the required Task objects based on user specifications, and activates them by invoking the start method to launch a new thread for each task. The Controller then employs the join method to await the completion of each Task's lifecycle before moving on to the next one. This cycle is repeated to maintain the stability of the system under test.
In the event of unrecoverable errors (such as Spot instances being reclaimed) or when operational commands are manually executed to interrupt the task, the Controller calls the interrupt method on the current Task to halt the thread and stop the task. The task loop then handles resource recovery, proceeds with the next task, or pauses, awaiting further instructions based on the situation.
The framework categorizes assertions based on the type of metrics detected into the following groups:
-
Client-side assertions include Message continuity assertions and transaction isolation level assertions.
-
Server-side state assertions encompass Traffic threshold assertions and load balancing assertions.
-
Time-based Assertions: These include stack accumulation duration assertions, task timeout verifications, and more
If standard assertion rules are insufficient, the Checker interface can be implemented to tailor custom assertions as needed
Building a robust system necessitates essential observability tools; without them, monitoring is reduced to passively observing alerts. The Marathon framework efficiently collects runtime data from Controllers and Workers, and it non-intrusively captures observability data from the tested systems. Utilizing Grafana's visualization tools, one can easily examine metrics, logs, profiling, and other observability data
In an event-driven architecture, unsatisfied assertions trigger specific events with varying severity levels. Alerts are issued for those events that require immediate attention from operational staff and are sent to the OnCall group for assessment. Combined with observability data, this approach enables quick and accurate issue identification, allows preemptive action by customers to address and mitigate potential risks, and facilitates ongoing performance optimization
Reflecting on our three design principles—scalability, observability, and cost-efficiency—it is critical that the Marathon framework addresses operations right from the start:
-
How can we build resilient loads for various task scenarios?
-
Considering the different resource demands of these loads, is it possible for the underlying machine resources to dynamically scale accordingly?
-
Costs are categorized into usage costs and operational costs.
-
In terms of usage costs, how can we quickly create and dismantle resources to reduce barriers for users?
-
As for operational costs, how can we efficiently construct the required loads using the fewest resources possible?
-
Marathon leverages Spot instances, K8s, and stateless Workers to address the problem, each representing the infrastructure layer, operational management layer, and application layer respectively.
Given the demand for both flexibility and cost-efficiency, Spot instances in the cloud are the obvious choice, priced at just 10% of what comparable Reserved instances cost. However, Spot instances introduce challenges, particularly the unpredictability of instance termination, which presents a significant architectural hurdle for applications. For Marathon, however, this is less of a concern as tasks can be rerun as needed.
The most straightforward design strategy is essentially no design: Marathon focuses on scenario description and task orchestration, leaving the scheduling responsibilities to K8s. Marathon concentrates on determining the necessary workload size and the required number of cores per workload unit; the elasticity of the underlying resources is managed by K8s, starting with an initial application for a Spot instance node group and then focusing on the logic of the testing scenario.
Nonetheless, the capability to utilize the benefits of Spot instances and K8s hinges on the application being stateless; otherwise, managing state persistence and reassignment becomes essential. This consideration is crucial in the design of the Worker module.
Marathon exhibits excellent abstraction in many of its modules, including service discovery, task scheduling, and load generation, all of which are readily adaptable to other contexts:
-
Service discovery: Currently based on APIs provided by the K8s API server, the data structure is abstracted into Node and Registration. Node represents the address and port of a Worker node, while Registration corresponds to the events of interest to each Worker. Thus, any shared storage capable of supporting these two data structures can act as a component for service functioning, whether it's MySQL or Redis.
-
Task scheduling: Workers are currently packaged as Docker images and deployed via K8s Deployment. Alternatively, they could be packaged as AMIs for direct launch on EC2 via cloud interfaces, or deployed using tools such as Vagrant and Ansible.
-
Load Generation: Currently, Marathon has incorporated a Kafka workload for each worker, which primarily involves deploying a specific number of Kafka clients to send and receive messages as dictated by the Controller's settings. Replacing Kafka clients with RocketMQ clients or HTTP clients can be accomplished with minimal effort.
Thanks to its robust abstraction features, Marathon's dependencies on external systems are modular and pluggable. Consequently, it functions not only as a continuous reliability testing platform for Kafka, but can also be seamlessly adapted to assess any distributed system, whether it operates in cloud-based or on-premises environments.
[1] AutoMQ: https://github.com/AutoMQ/automq
[2] Spot Instance: https://docs.aws.amazon.com/zh_cn/AWSEC2/latest/UserGuide/using-spot-instances.html
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
-
S3stream shared streaming storage
-
Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
-
Data analysis
-
Object storage
-
Kafka ui
-
Observability
-
Data integration