Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhbaker committed Oct 25, 2024
1 parent fd9f54a commit f4e38fc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
4 changes: 3 additions & 1 deletion source-kafka/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ services:
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://db:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://db:29092,PLAINTEXT_HOST://source-kafka-db-1.flow-test:9092'
# Replace the line above with the one below for running tests locally:
# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://db:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
Expand Down
1 change: 1 addition & 0 deletions source-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub async fn run_connector(
});

let pull = tokio::spawn(do_pull(req, stdout));

tokio::select! {
pull_res = pull => pull_res??,
eof_res = eof => eof_res??,
Expand Down
12 changes: 6 additions & 6 deletions source-kafka/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ lazy_static! {
};
}

fn bytes_to_key(vec: &[u8]) -> Key {
assert!(vec.len() == 32, "The key must be exactly 32 bytes long.");
fn bytes_to_key(key: &[u8]) -> Key {
assert!(key.len() == 32, "The key must be exactly 32 bytes long.");

Key([
u64::from_le_bytes(vec[0..8].try_into().unwrap()),
u64::from_le_bytes(vec[8..16].try_into().unwrap()),
u64::from_le_bytes(vec[16..24].try_into().unwrap()),
u64::from_le_bytes(vec[24..32].try_into().unwrap()),
u64::from_le_bytes(key[0..8].try_into().unwrap()),
u64::from_le_bytes(key[8..16].try_into().unwrap()),
u64::from_le_bytes(key[16..24].try_into().unwrap()),
u64::from_le_bytes(key[24..32].try_into().unwrap()),
])
}

Expand Down
2 changes: 0 additions & 2 deletions tests/source-kafka/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ export TEST_STREAM="estuary-test-$(shuf -zer -n6 {a..z} | tr -d '\0')"
export RESOURCE="{\"topic\": \"${TEST_STREAM}\"}"
export CONNECTOR_CONFIG='{"bootstrap_servers": "source-kafka-db-1.flow-test:9092"}'

echo $RESOURCE

docker compose -f source-kafka/docker-compose.yaml up --wait --detach

docker exec source-kafka-db-1 sh -c "/bin/kafka-topics --create --topic ${TEST_STREAM} --bootstrap-server localhost:9092"
Expand Down

0 comments on commit f4e38fc

Please sign in to comment.