kafka consumer cannot connect

I am running a kafka and a zookeeper in docker.

CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                    NAMES
34a113626eb6        confluent/kafka       "/usr/local/bin/ka..."   11 minutes ago      Up 11 minutes       0.0.0.0:9092->9092/tcp                                                   kafka
552d1d787b9e        confluent/zookeeper   "/usr/local/bin/zk..."   12 minutes ago      Up 12 minutes       0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp   zookeeper

But when I use kafka command line to consumer message, I saw this error:

iMac:kafka lluo$ bin/kafka-console-consumer.sh --zookeeper `docker-machine ip bigdata`:2181 --topic bigdata
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2018-10-09 09:09:05,504] WARN [ConsumerFetcherThread-console-consumer-62804_Russells-iMac.hsd1.ca.comcast.net-1539101285161-d8affa4e-0-0]: Error in fetch to broker 0, request Name: FetchRequest; Version: 3; CorrelationId: 0; ClientId: console-consumer-62804; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; MaxBytes:2147483647 bytes; RequestInfo: ([bigdata,0],PartitionFetchInfo(5,1048576)) (kafka.consumer.ConsumerFetcherThread)
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:102)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:135)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:134)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:133)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:114)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:35)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

Does anyone solve this issue before?

1 answer

  • answered 2018-10-09 17:02 Giorgos Myrianthous

    Instead of zookeeper, you need to use bootstrap-server (port 9092 is the default). The warning you get clearly indicates that you've been using the old consumer which is deprecated.

    Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    

    Since Kafka has moved the offset storage from Zookeeper to Brokers, Kafka consumer requires bootstrap-server parameter as it needs to connect to the Kafka Brokers and therefore, there is no need to communicate with Zookeeper directly.