kafka consumer cannot connect
I am running a kafka and a zookeeper in docker.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
34a113626eb6 confluent/kafka "/usr/local/bin/ka..." 11 minutes ago Up 11 minutes 0.0.0.0:9092->9092/tcp kafka
552d1d787b9e confluent/zookeeper "/usr/local/bin/zk..." 12 minutes ago Up 12 minutes 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp zookeeper
But when I use kafka command line to consumer message, I saw this error:
iMac:kafka lluo$ bin/kafka-console-consumer.sh --zookeeper `docker-machine ip bigdata`:2181 --topic bigdata
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2018-10-09 09:09:05,504] WARN [ConsumerFetcherThread-console-consumer-62804_Russells-iMac.hsd1.ca.comcast.net-1539101285161-d8affa4e-0-0]: Error in fetch to broker 0, request Name: FetchRequest; Version: 3; CorrelationId: 0; ClientId: console-consumer-62804; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; MaxBytes:2147483647 bytes; RequestInfo: ([bigdata,0],PartitionFetchInfo(5,1048576)) (kafka.consumer.ConsumerFetcherThread)
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:102)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:135)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:134)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:133)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:114)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:35)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
Does anyone solve this issue before?
1 answer
-
answered 2018-10-09 17:02
Giorgos Myrianthous
Instead of
zookeeper
, you need to usebootstrap-server
(port 9092 is the default). The warning you get clearly indicates that you've been using the old consumer which is deprecated.Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Since Kafka has moved the offset storage from Zookeeper to Brokers, Kafka consumer requires
bootstrap-server
parameter as it needs to connect to the Kafka Brokers and therefore, there is no need to communicate with Zookeeper directly.