diff --git a/source-kafka/docker-compose.yaml b/source-kafka/docker-compose.yaml index dedaf1c13..f7697a1c7 100644 --- a/source-kafka/docker-compose.yaml +++ b/source-kafka/docker-compose.yaml @@ -4,7 +4,9 @@ services: environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://db:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://db:29092,PLAINTEXT_HOST://source-kafka-db-1.flow-test:9092' + # Replace the line above with the one below for running tests locally: + # KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://db:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 diff --git a/source-kafka/src/lib.rs b/source-kafka/src/lib.rs index 76b909734..d3963e904 100644 --- a/source-kafka/src/lib.rs +++ b/source-kafka/src/lib.rs @@ -97,6 +97,7 @@ pub async fn run_connector( }); let pull = tokio::spawn(do_pull(req, stdout)); + tokio::select! { pull_res = pull => pull_res??, eof_res = eof => eof_res??, diff --git a/source-kafka/src/pull.rs b/source-kafka/src/pull.rs index e15fe4ccf..58ccd4598 100644 --- a/source-kafka/src/pull.rs +++ b/source-kafka/src/pull.rs @@ -190,14 +190,14 @@ lazy_static! { }; } -fn bytes_to_key(vec: &[u8]) -> Key { - assert!(vec.len() == 32, "The key must be exactly 32 bytes long."); +fn bytes_to_key(key: &[u8]) -> Key { + assert!(key.len() == 32, "The key must be exactly 32 bytes long."); Key([ - u64::from_le_bytes(vec[0..8].try_into().unwrap()), - u64::from_le_bytes(vec[8..16].try_into().unwrap()), - u64::from_le_bytes(vec[16..24].try_into().unwrap()), - u64::from_le_bytes(vec[24..32].try_into().unwrap()), + u64::from_le_bytes(key[0..8].try_into().unwrap()), + u64::from_le_bytes(key[8..16].try_into().unwrap()), + u64::from_le_bytes(key[16..24].try_into().unwrap()), + u64::from_le_bytes(key[24..32].try_into().unwrap()), ]) } diff --git a/tests/source-kafka/setup.sh b/tests/source-kafka/setup.sh index 88bc4cc13..f8fcd24a8 100755 --- a/tests/source-kafka/setup.sh +++ b/tests/source-kafka/setup.sh @@ -5,8 +5,6 @@ export TEST_STREAM="estuary-test-$(shuf -zer -n6 {a..z} | tr -d '\0')" export RESOURCE="{\"topic\": \"${TEST_STREAM}\"}" export CONNECTOR_CONFIG='{"bootstrap_servers": "source-kafka-db-1.flow-test:9092"}' -echo $RESOURCE - docker compose -f source-kafka/docker-compose.yaml up --wait --detach docker exec source-kafka-db-1 sh -c "/bin/kafka-topics --create --topic ${TEST_STREAM} --bootstrap-server localhost:9092"