Skip to content

Commit

Permalink
Tests and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Apr 3, 2023
1 parent b222a73 commit 9df2ba9
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 22 deletions.
88 changes: 88 additions & 0 deletions dev-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
version: '2'
services:
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka0
- schema-registry0
- kafka-connect0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests

kafka0:
image: confluentinc/cp-kafka:7.2.1.arm64
hostname: kafka0
container_name: kafka0
ports:
- 9092:9092
- 9997:9997
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
volumes:
- ./scripts/update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

schema-registry0:
image: confluentinc/cp-schema-registry:7.2.1.arm64
ports:
- 8085:8085
depends_on:
- kafka0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schema-registry0
SCHEMA_REGISTRY_LISTENERS: http://schema-registry0:8085

SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-connect0:
image: confluentinc/cp-kafka-connect:7.2.1.arm64
ports:
- 8083:8083
depends_on:
- kafka0
- schema-registry0
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: _connect_status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry0:8085
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry0:8085
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
20 changes: 16 additions & 4 deletions lib/kafkaesque.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ defmodule Kafkaesque do
`partition/2` should return an integer, and defaults to 0. It will be used
as the partition for the message.
See the documentation for `Kafkaesque.start_link/1` to learn about starting
Kafkaesque in your application.
"""

use GenServer
Expand All @@ -47,6 +50,19 @@ defmodule Kafkaesque do

@doc """
Starts a Kafkaesque instance. Accepts the following opts:
- `:repo`: the repo where messages will be read from. Usually should be the
same repo that you're writing to.
- `:client`: the client to be used by the publisher. Defaults to
`Kafkaesque.KafkaClients.BrodClient`
- `:client_opts`: the options to be used by the client. Defaults to `[]`. The
default client requires options, so this can be considered required for most
use-cases. Look at the client documentation for more information about the
client options.
- `:publisher_max_demand`: maximum publisher demand, can be useful for tuning.
Defaults to 200. See `GenStage` documentation for more info.
- `:publisher_min_demand`: minimum publisher demand, can be useful for tuning.
Defaults to 190. See `GenStage` documentation for more info.
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
Expand Down Expand Up @@ -75,10 +91,6 @@ defmodule Kafkaesque do
{repo, _opts} = Keyword.pop!(opts, :repo)

quote do
def start_link(opts) do
Kafkaesque.start_link(opts)
end

@spec publish(String.t(), term()) :: {:ok, Kafkaesque.Message.t()} | {:error, atom()}
def publish(topic, body) do
payload = encode(message)
Expand Down
7 changes: 4 additions & 3 deletions lib/kafkaesque/acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ defmodule Kafkaesque.Acknowledger do
@impl GenStage
def init(opts) do
repo = Keyword.fetch!(opts, :repo)
producer_pid = Keyword.fetch!(opts, :publishr_pid)
publisher_pid = Keyword.fetch!(opts, :publisher_pid)

{
:consumer,
%{repo: repo},
[subscribe_to: [producer_pid]]
[subscribe_to: [publisher_pid]]
}
end

# TODO: possibly perform additional batching for performance in cases where
# batches are mostly composed by messages from different queues.
# workload is mostly composed by messages from different queues (thus coming
# in different batches)
@impl GenStage
def handle_events(events, _from, state) do
Enum.each(events, &handle_event(&1, state))
Expand Down
22 changes: 11 additions & 11 deletions lib/kafkaesque/kafka_clients/brod_publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Kafkaesque.KafkaClients.BrodClient do

@behaviour Kafkaesque.Client

@default_opts [reconnect_cool_down_seconds: 2]
@default_opts [reconnect_cool_down_seconds: 2, auto_start_producers: true]

@impl Kafkaesque.Client
def start_link(opts) do
Expand All @@ -34,33 +34,33 @@ defmodule Kafkaesque.KafkaClients.BrodClient do
@impl Kafkaesque.Client
def publish(%{brod_client_id: client_id, task_supervisor: task_supervisor}, messages) do
# We pre-process the message bodies to avoid copying unecessary data to the task
message_batches = Enum.group_by(messages, &{&1.partition_key, &1.topic}, &{"", &1.body})
message_batches = Enum.group_by(messages, &{&1.partition, &1.topic}, &{"", &1.body})

task_results =
task_supervisor
|> Task.Supervisor.async_stream_nolink(message_batches, fn {{partition_key, topic}, values} ->
case :brod.produce_sync(client_id, topic, partition_key, "", values) do
|> Task.Supervisor.async_stream_nolink(message_batches, fn {{partition, topic}, values} ->
case :brod.produce_sync(client_id, topic, partition, "", values) do
:ok ->
{{partition_key, topic}, :success}
{{partition, topic}, :success}

error ->
log_error(topic, partition_key, error)
{{partition_key, topic}, :error}
log_error(topic, partition, error)
{{partition, topic}, :error}
end
end)
|> Map.new(fn {:ok, task_result} -> task_result end)

result =
Enum.group_by(messages, fn m ->
Map.fetch!(task_results, {m.partition_key, m.topic})
Map.fetch!(task_results, {m.partition, m.topic})
end)

{:ok, result}
end

defp log_error(topic, partition_key, error) do
("{inspect(__MODULE__)} failure publishing message batch to topic " <>
"#{inspect(topic)} partition #{inspect(partition_key)} with error " <>
defp log_error(topic, partition, error) do
("#{inspect(__MODULE__)} failure publishing message batch to topic " <>
"#{inspect(topic)} partition #{inspect(partition)} with error " <>
inspect(error))
|> Logger.warn()
end
Expand Down
10 changes: 6 additions & 4 deletions lib/kafkaesque/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ defmodule Kafkaesque.Publisher do
- `:client`: the client module that will be used to publish the messages.
Defaults to `Kafkaesque.KafkaClients.BrodClient`.
- `:client_opts`: A list of options to be passed to the client on startup.
Defaults to []. The default client requires options, so this can be considered
required for most use-cases.
Defaults to `[]`. The default client requires options, so this can be
considered required for most use-cases.
"""

use GenStage
Expand All @@ -22,6 +22,8 @@ defmodule Kafkaesque.Publisher do
client_mod = Keyword.get(opts, :client, Kafkaesque.KafkaClients.BrodClient)
client_opts = Keyword.get(opts, :client_opts, [])
producer_pid = Keyword.fetch!(opts, :producer_pid)
# min_demand = Keyword.get(opts, :publisher_min_demand, 190)
# max_demand = Keyword.get(opts, :publisher_max_demand, 200)

{:ok, client} = client_mod.start_link(client_opts)

Expand All @@ -35,10 +37,10 @@ defmodule Kafkaesque.Publisher do
@impl GenStage
def handle_events(messages, _from, state) do
case state.client_mod.publish(state.client, messages) do
{:ok, %{sucess: success, failure: failure}} ->
{:ok, %{success: success, error: error}} ->
events = [
{:success_batch, Enum.map(success, & &1.id)},
{:failure_batch, Enum.map(failure, & &1.id)}
{:failure_batch, Enum.map(error, & &1.id)}
]

{:noreply, events, state}
Expand Down
11 changes: 11 additions & 0 deletions scripts/update_run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# This script is required to run kafka cluster (without zookeeper)
#!/bin/sh

# Docker workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure

# Docker workaround: Ignore cub zk-ready
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure

# KRaft required step: Format the storage directory with a new cluster ID
echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
78 changes: 78 additions & 0 deletions test/kafkaesque/brod_client_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule Kafkaesque.KafkaClients.BrodClientTest do
use Kafkaesque.Case, async: false

alias Kafkaesque.KafkaClients.BrodClient

setup_all do
topic_configs = [
%{
configs: [
%{
name: "cleanup.policy",
value: "compact"
},
%{
name: "confluent.value.schema.validation",
value: false
}
],
num_partitions: 1,
replication_factor: 1,
assignments: [],
name: "integration_test_topic"
},
%{
configs: [
%{
name: "cleanup.policy",
value: "compact"
},
%{
name: "confluent.value.schema.validation",
value: true
}
],
num_partitions: 1,
replication_factor: 1,
assignments: [],
name: "integration_test_topic_2"
}
]

_ = :brod.create_topics([{"localhost", 9092}], topic_configs, %{timeout: 15_000})

:ok
end

# required a running kafka instance
@tag :integration
test "starts and publishes messages" do
assert {:ok, client} =
BrodClient.start_link(
client_id: :my_client,
brokers: [{"localhost", 9092}]
)

# Success case
message = %Kafkaesque.Message{
id: 1,
partition: 0,
topic: "integration_test_topic",
body: "test_message"
}

assert {:ok, %{success: [%Message{body: "test_message"}]}} =
BrodClient.publish(client, [message])

# Failure case
message = %Kafkaesque.Message{
id: 1,
partition: 9,
topic: "integration_test_topic_2",
body: "test_message"
}

assert {:ok, %{error: [%Message{body: "test_message"}]}} =
BrodClient.publish(client, [message])
end
end
Loading

0 comments on commit 9df2ba9

Please sign in to comment.