Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka is not supported? #171

Closed
ar0ne opened this issue Apr 18, 2017 · 8 comments
Closed

Kafka is not supported? #171

ar0ne opened this issue Apr 18, 2017 · 8 comments
Assignees

Comments

@ar0ne
Copy link

ar0ne commented Apr 18, 2017

Hi All,
I tried to use toxiproxy for simulation bad connections to Kafka cluster. And it works like proxy, but when added some toxic it breaks connection and switch connection directly to original url (instead of use proxy, we receive messages without delay) or sometimes it just skips messages (don't receive messages).
I use kafka cluster (https://github.com/wurstmeister/kafka-docker) and toxiproxy-server.

$ docker ps -a
CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                                NAMES
51af67256d9e        kafkadocker_kafka           "start-kafka.sh"         23 hours ago        Up 23 hours         0.0.0.0:9092->9092/tcp                               kafkadocker_kafka_1
ccda8f90998a        wurstmeister/zookeeper      "/bin/sh -c '/usr/..."   23 hours ago        Up 23 hours         22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1 

And add proxy: 9999->9092

{
	"test_proxy": {
		"name": "test_proxy",
		"listen": "127.0.0.1:9999",
		"upstream": "0.0.0.0:9092",
		"enabled": true,
		"toxics": [
			{
				"attributes": {
					"latency": 2000,
					"jitter": 500
				},
				"name": "latency_downstream",
				"type": "latency",
				"stream": "downstream",
				"toxicity": 1
			}
		]
	}
}

I tried to send message(any string) with enabled toxic and then got it.

$ ./kafka-console-producer.sh --broker-list localhost:9999 --topic ptf.inbound.if
...
[2017-04-17 18:44:31,076] WARN Fetching topic metadata with correlation id 40 for topics [Set(ptf.inbound.if)] from broker [id:0,host:localhost,port:9999] failed (kafka.client.ClientUtils$)
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
	at kafka.utils.Utils$.swallow(Utils.scala:172)
	at kafka.utils.Logging$class.swallowError(Logging.scala:106)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:594)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2017-04-17 18:44:31,077] ERROR fetching topic metadata for topics [Set(ptf.inbound.if)] from broker [ArrayBuffer(id:0,host:localhost,port:9999)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(ptf.inbound.if)] from broker [ArrayBuffer(id:0,host:localhost,port:9999)] failed
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
	at kafka.utils.Utils$.swallow(Utils.scala:172)
	at kafka.utils.Logging$class.swallowError(Logging.scala:106)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:594)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	... 12 more
[2017-04-17 18:44:31,078] ERROR Failed to send requests for topics ptf.inbound.if with correlation ids in [33,40] (kafka.producer.async.DefaultEventHandler)
[2017-04-17 18:44:31,078] ERROR Error in handling batch of 4 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:594)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

And tried to consume messages, but without success.

$ ./kafka-console-consumer.sh --zookeeper 0.0.0.0:2181 --topic ptf.inbound.if  --from-beginning
$ toxiproxy-server
...
INFO[0134] Accepted client                               client=127.0.0.1:50349 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0135] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50350->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0136] Accepted client                               client=127.0.0.1:50366 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0137] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50367->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0137] Accepted client                               client=127.0.0.1:50384 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0139] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50385->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0139] Accepted client                               client=127.0.0.1:50401 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0140] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50402->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0140] Accepted client                               client=127.0.0.1:50418 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0142] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50419->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0142] Accepted client                               client=127.0.0.1:50435 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0143] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50436->127.0.0.1:9092: use of closed network connection name=test_proxy

Is it okay or I miss something?
Thanks

@jpittis
Copy link
Contributor

jpittis commented Apr 18, 2017

I think in this case the Source terminated / use of closed network connection means that Toxiproxy is reading from kafka while or after kafka closes the connection.

My feeling is that this issue is not with Toxiproxy but instead the upstream kafka at 127.0.0.1:9092 is closing the connection for some reason.

@jpittis
Copy link
Contributor

jpittis commented Apr 18, 2017

I may have time to try reproducing this later in the week. :)

@jpittis jpittis self-assigned this Apr 18, 2017
@ar0ne
Copy link
Author

ar0ne commented May 17, 2017

Hi @jpittis, any update on the above?

@ananthakumaran
Copy link

ananthakumaran commented Jun 19, 2018

We have used toxiproxy successfully with kafka. The issue here is (and with most distributed systems like redis sentinel etc) you just specify the coordinators(or broker or whatever it's called) ip addresses in the client config. Once the client establishes connection with any coordinator, it will ask the coordinator for the list of nodes. Unless configured explicitly, the coordinator will just give the ip addresses. Now the client will just directly connect to the given ip addresses bypassing toxiproxy completely.

In kafka case, the advertised host name can be configured.

advertised.host.name=KAFKA_HOSTNAME

We use hacks in docker-compose to trick the dns to resolve kafka to toxiproxy. So from all other containers kafka will resolve to the original node, but from consumer container it will resolve to toxiproxy. You still need to make sure the port numbers are same in kafka & toxiproxy

    links:
      - toxiproxy:kafka

@amitech74
Copy link

We have used toxiproxy successfully with kafka. The issue here is (and with most distributed systems like redis sentinel etc) you just specify the coordinators(or broker or whatever it's called) ip addresses in the client config. Once the client establishes connection with any coordinator, it will ask the coordinator for the list of nodes. Unless configured explicitly, the coordinator will just give the ip addresses. Now the client will just directly connect to the given ip addresses bypassing toxiproxy completely.

In kafka case, the advertised host name can be configured.

advertised.host.name=KAFKA_HOSTNAME

We use hacks in docker-compose to trick the dns to resolve kafka to toxiproxy. So from all other containers kafka will resolve to the original node, but from consumer container it will resolve to toxiproxy. You still need to make sure the port numbers are same in kafka & toxiproxy

    links:
      - toxiproxy:kafka

Do you have a working example on how to set this up ? If coordinator hands out the kafka ip addresses, how does it go through toxiproxy? I mean in case of multiple kafka brokers.
I'm trying to set this up in a non-docker env and having this exact problem, where it's skipping toxiproxy and connecting to the brokers directly

@ananthakumaran
Copy link

broker config

  kafka:
    build:
      context: ../../docker/kafka
    environment:
      KAFKA_HOSTNAME: test-kafka
    extra_hosts:
      - "test-kafka:127.0.0.1"

service.properties

##################### test specifics #########################
advertised.host.name={{KAFKA_HOSTNAME}}
advertised.port={{KAFKA_PORT}}
advertised.listeners=PLAINTEXT://{{KAFKA_HOSTNAME}}:{{KAFKA_PORT}}
listeners=PLAINTEXT://0.0.0.0:{{KAFKA_PORT}}
port={{KAFKA_PORT}}

client config

  consumer:
    links:
      - kafka
      - toxiproxy:test-kafka

The above config is what we use. In non docker env it should be simple as well. Change the /etc/hosts file in the client and override the kafka hostname with toxiproxy address. The port number used in toxiproxy listen should be same as the upstream. This way, whenever, the client wants to connect to any broker, the hostname would get resolved to toxiproxy.

Do you have a working example on how to set this up ?

I can't share the full code

If coordinator hands out the kafka ip addresses, how does it go through toxiproxy?

The advertised.host.name can be configured to use a hostname instead of ip

I mean in case of multiple kafka brokers.

Just configure multiple hostnames and add multiple entries in /etc/hosts of the client

@miry miry closed this as completed Sep 17, 2021
@Anu5399
Copy link

Anu5399 commented Jan 27, 2024

same issue happening with redis cluster as well. geting error like close network connection. can you help me with a solution ?

@miry
Copy link
Contributor

miry commented Jan 28, 2024

@Anu5399 I would propose to create a separate issue with your example how could be reproduced: toxiproxy setup, sample applications, logs and etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants