project-root/
│
├── docker-compose.yml # Docker Compose file for setting up Kafka
├── requirements.txt # Python dependencies
│
├── kafka_processor/ # Main processing folder
│ ├── main.py # Main script to run the pipeline
│ ├── producer.py # Producer logic for Kafka
│ ├── consumer.py # Consumer logic for Kafka
│ ├── transformer.py # Message transformation logic
│ ├── summary_printer.py # Summary statistics management
│ ├── metrics.py # Metrics collection and management
│ └── Verify # Verification scripts for testing data flow
│ ├── verify_processed_output.py # Verifies 'processed-output' topic
│ ├── verify_processed_errors.py # Verifies 'processed-errors' topic
│ ├── verify_summary_data.py # Verifies 'summary-output' topic
│ ├── verify_cleaned_data.py # Verifies 'cleaned-data' topic
│ └── verify_metrics.py # Verifies 'metrics-output' topic
│
└── README.md # Project documentation and setup instructions
This project implements a real-time data processing pipeline using Apache Kafka, designed to efficiently consume, process, transform, and publish messages while ensuring robust error handling, summary statistics, metrics handling and performance metrics.
The pipeline consists of several key components and utilizes four Kafka topics to manage data flow and logging:
-
Topics:
user-login
: The source topic where raw login events are published.processed-output
: The destination topic for successfully transformed and processed messages.processed-errors
: A dedicated topic for logging errors encountered during processing, such as JSON decoding issues or missing fields.summary-output
: A topic for publishing summary statistics, including counts of processed messages, device types, locales, and filtered records.cleaned-data (Optional)
: A topic for logging messages filtered out based on specific criteria (e.g., app_version)metrics-output
: A topic for publishing performance metrics.
-
Components:
- Consumer: Subscribes to the
user-login
topic to read incoming messages. - Producer: Publishes transformed messages to
processed-output
and logs errors toprocessed-errors
. - Transformer: Processes each message by hashing sensitive fields and formatting timestamps.
- Summary Printer: Maintains processing statistics and publishes summaries to
summary-output
. - Metrics Collector: Tracks and publishes performance metrics.
- Consumer: Subscribes to the
-
Message Consumption:
- The consumer reads messages from the
user-login
topic using the create_consumer_with_retry function. - Messages are polled using the poll_message function.
- The consumer reads messages from the
-
Age Filtering:
- Messages older than MAX_MESSAGE_AGE (60 seconds) are skipped.
-
Message Parsing and Validation:
- Messages are decoded from JSON format.
- Checks are performed for required fields (user_id, ip, device_id, app_version, device_type, timestamp, locale).
- Messages with missing user_id or other required fields are logged as errors.
-
App Version Filtering:
- Messages with an
app_version
not equal to '2.3.0' are filtered out and logged to the 'cleaned-data' topic.
- Messages with an
-
Transformation:
- Messages are transformed using the transform_message function, transformation include hashing (encoding) IP and DeviceID.
-
Publishing:
- Successfully transformed messages are published to the
processed-output
topic. - Errors encountered during processing are logged to the
processed-errors
topic.
- Successfully transformed messages are published to the
-
Metrics Recording:
- Processing metrics are recorded for each message using the MessageMetrics class.
- Metrics are published to the
metrics-output
topic after each message is processed.
-
Summary Statistics:
- Tracks processed message counts, device types, and locales.
- Publishes summary statistics every 1000 processed messages (SUMMARY_PUBLISH_INTERVAL) to the
summary-output
topic.
-
Error Handling:
- Various error conditions (parsing errors, missing fields, transformation errors, publishing errors) are caught and logged to the
processed-errors
topic.
- Various error conditions (parsing errors, missing fields, transformation errors, publishing errors) are caught and logged to the
-
Offset Management:
- The consumer manually commits offsets after successful message processing.
This data flow reflects the additional error handling, metrics recording, and more granular filtering present in your current implementation.
- Flexibility: A modular approach allows each component of the pipeline (e.g., consumer, producer, transformer) to be developed, tested, and maintained independently.
- Reusability: Code can be reused across different parts of the application or in future projects.
- Scalability: Modules can be scaled independently based on load and performance requirements.
The decision to hash the device ID and mask the IP address is based on several privacy considerations and legal requirements:
-
Compliance with data protection laws:
- GDPR considers IP addresses as personal data.
- CCPA includes IP addresses and device IDs as personal information.
-
Balancing data utility and privacy:
- Hashing device ID provides anonymization while maintaining unique identifier.
- Masking IP address preserves some network-level information while protecting specific device identity.
-
Risk mitigation:
- Reduces the risk of re-identification of individuals.
- Enhances data security in case of breaches.
Advantages of this approach:
- Legal compliance: Meets requirements of various privacy laws like GDPR and CCPA.
- Data minimization: Adheres to the principle of collecting only necessary data.
- Flexibility: Allows for data analysis while protecting individual privacy.
- Trust building: Demonstrates commitment to user privacy, potentially improving reputation.
- Reduced liability: Minimizes risks associated with storing identifiable personal information.
- Maintained utility: Preserves some useful information for analytics and security purposes.
This approach strikes a balance between data protection and usability, addressing key privacy concerns while still allowing for necessary data processing and analysis.
- Real-Time Insights: Summary statistics provide immediate visibility into the pipeline's operations, allowing for quick detection of potential issues or anomalies.
- Business Intelligence: These statistics enable the creation of dashboards that support data-driven business decisions and strategic planning.
- Operational Efficiency: By understanding data flow and processing trends, teams can optimize resource allocation and improve overall pipeline performance.
- Proactive Monitoring: Continuous tracking helps in identifying patterns that may require intervention, ensuring smooth and efficient pipeline operations.
- Centralized Error Handling: Using a Kafka topic for errors allows centralized collection and analysis of issues across distributed systems.
- Scalability: Kafka's architecture supports high-throughput error logging without impacting system performance.
- Durability: Error messages are stored durably in Kafka, allowing for replay and analysis if needed.
- Data Quality Insights: Logging filtered messages helps identify patterns or anomalies in the data that may require attention or adjustment in filtering criteria.
- Audit Trail: Provides a record of all data that was excluded from processing, which can be useful for compliance and auditing purposes.
- Optimization: Analyzing filtered data can lead to improvements in data collection processes or filtering logic, enhancing overall pipeline efficiency.
- Data Integrity: Ensures that offsets are committed only after successful message processing, preventing data loss or duplication.
- Error Recovery: Allows precise control over message acknowledgment, facilitating effective error recovery strategies.
- Reliability Enhancement: Setting retries to handle transient errors ensures that temporary network issues or broker unavailability do not result in message loss.
- System Stability: Retries help maintain consistent message delivery without significant delays, improving overall system robustness.
- Throughput Optimization: Configuring
linger.ms
,batch.size
, andcompression.type
helps maximize throughput by efficiently packaging messages for transmission. - Resource Efficiency: Reduces network load and improves processing speed by minimizing the size of data packets sent over the network.
- Troubleshooting: Comprehensive logging captures issues at each stage of processing, providing detailed insights that facilitate quick identification and resolution of problems.
- Operational Visibility: By logging errors systematically, developers and operators gain a clear view of where and why failures occur, enabling proactive management of the pipeline.
- Continuous Improvement: Detailed error logs allow for analysis over time, helping to identify recurring issues and opportunities for optimization in the data processing workflow.
- Docker and Docker Compose
- Python 3.8.12
- Confluent Kafka Python client (
pip install confluent-kafka
)
-
Start Your Kafka Broker:
- Use Docker Compose to start your Kafka broker. Navigate to the directory containing your
docker-compose.yml
file and run:docker-compose up
- Use Docker Compose to start your Kafka broker. Navigate to the directory containing your
-
Create a Virtual Environment:
-
Navigate to your project directory and create a virtual environment using the given
requirements.txt
:python -m venv kafka-env
or
conda create --name your_env_name python=3.8
-
-
Activate the Virtual Environment:
- On Windows:
.\kafka-venv\Scripts\activate
- On macOS and Linux:
source kafka-venv/bin/activate
- conda users
conda activate your_env_name
- On Windows:
-
Install Dependencies:
- Install the required packages from
requirements.txt
:pip install -r requirements.txt
- Install the required packages from
-
Run the Main Pipeline:
- Navigate to the
kafka_processor
folder and run the main script:cd kafka_processor python main.py
- You will see a message in the terminal saying "Kafka Consumer has started..."
- Navigate to the
-
Verify Data Flow with Consumers:
- Open four new terminal windows, activate the virtual environment in each, and run the following scripts located under
kafka_processor/Verify/
to test if data is passed to these four topics:- For each terminal, navigate to your project directory and activate the virtual environment as shown in step 3.
- Run each consumer script for the respective topics:
python verify_processed_output.py python verify_processed_errors.py python verify_summary_output.py python verify_cleaned_data.py python verify_metrics_data.py
- Open four new terminal windows, activate the virtual environment in each, and run the following scripts located under
By following these steps, you can set up and run your Kafka-based real-time data processing pipeline, ensuring that all components are functioning correctly.
- Note:
-
The
cleaned-data
topic will not show any values because all input records have an Android version of 2.3.0, which means no records are filtered out based on this criterion. -
I have used Github CodeSpaces to implement this project!
The Docker container for the Python producer generates messages as follows:
Looking at this we feel like this data is processed in real-time for analytics or can be used to login to any application.
When running the main script, you should see:
Running verify_processed_data.py
shows:
Running verify_processed_errors.py
shows:
Running verify_summary_data.py
shows:
Running verify_metrics_data.py
shows:
PS: Here modified verify-metrics to make it look neat but this is how it will look
Note: The cleaned-data
topic will not show any values because all input records have an Android version of 2.3.0, meaning no records are filtered out based on this criterion.
- Containerize the application components using Docker
- Deploy on Kubernetes for container orchestration and scaling
- Set up a CI/CD pipeline (e.g., Jenkins, GitLab CI, or GitHub Actions)
- Use a production-grade Kafka cluster using services like confluent cloud.
- Implement servies like Datadog for comprehensive monitoring and observability
- Use infrastructure-as-code tools like Terraform for configuration management
- Implement a schema registry (e.g., Confluent Schema Registry)
- Set up SSL/TLS encryption for data in transit
- Implement data lineage and governance tools to track data flow and ensure compliance.
- Implement Performance Testing
- Configure PagerDuty, OpsGenie or Datadog Alerts for critical failures or performance issues
- Implement a robust backup and disaster recovery strategy
- Increase the number of partitions for each topic to allow for better parallelism and distribution of data.
- Move from a single broker to a multi-broker cluster allowing for better distribution of partitions across multiple servers, increasing throughput and fault tolerance.
- Upgrade hardware resources (CPU, RAM, disk) of Kafka brokers and application servers (Vertical Scaling)
- Instead of a single consumer, create a consumer group with multiple consumers allowing for parallel processing of messages from different partitions
- For complex data processing, Kafka Streams can help scale your data processing pipeline.
- Kafka Connect can help in efficiently moving data in and out of Kafka at scale.
- Use stream processing frameworks like Kafka Streams or Apache Flink for efficient real-time processing
- Implement intelligent load balancing for consumers
- Deploying Kafka on Kubernetes for easier scaling and management.
- Set up auto-scaling based on monitoring metrics