SEO Сервисы и программы
6 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд

Apache kafka java

Spring Apache Kafka Tutorial

In this Spring Apache Kafka lesson, we will learn how we can get started with Apache Kafka in a Spring Boot project and start producing and consuming messages on the topic of our choice. Apart from a simple project, we will deep dive into Kafka terminologies and how the concept of partitioning works in Kafka. Let’s get started.

1. Introduction

With the uprise of Microservices, the necessity of asynchronous communication between the involved services became a mainstream requirement. Actually, that is how Apache Kafka came into existence at LinkedIn. The main requirements of the new asynchronous communication system they needed were message persistence and high throughput. Once LinkedIn was able to make a project in the face of Kafka, they donated the project to Apache Software foundation where it came to be known as Apache Kafka.

Table Of Contents

2. What is Apache Kafka?

Kafka was developed at LinkedIn in the year 2010 and it was donated to Apache Software Foundation where it became one of the top projects by 2012. Apache Kafka has three main components in its ecosystem:

  1. Publisher-Subscriber: This component of Kafka is responsible to send and consume data across Kafka Nodes (more on Nodes in coming sections) and the consumer apps which can scale with very high throughput
  2. Kafka Streams: With Kafka streaming API, it is possible to process incoming data into kafka at near real-time
  3. Connect API: With the Connect API, it is possible to integrate many external data sources and data sinks with Kafka

For a high-level definition, we can provide a simple definition for Apache Kafka:

Apache Kafka is a distributed, fault-tolerant, horizontally-scalable, commit log.

Let us elaborate what we just said:

  • Distributed: Kafka is a distributed system where all the messages are replicated across various nodes so that each server is capable of responding to the client for the messages it contains. Also, even if one node fails, other nodes can quickly take over without any downtime
  • Fault-tolerant: As Kafka does not have a Single Point of Failure, even if one of the node goes down, the end-user will hardly notice this as the other parts take responsibility of the messages which are lost due to the failed node
  • Horizontally-scalable: Kafka allows us to add more machines to the cluster with zero downtime. This means that if we start to face lag in messages due to a low number of servers in the cluster, we can quickly add more servers and maintain the performance of the system
  • Commit Log: A commit log refers to a structure similar to a Linked List. The order of insertion of messages is maintained and data cannot be removed from this log until a threshold time is reached

A lot more concepts about Apache Kafka will become clear in the coming sections where we talk about basic terminologies used in Apache Kafka lesson.

3. Apache Kafka Terminologies

Before we can move ahead with Kafka concepts and sample project, we must understand basic terms related to Apache Kafka. Some of these are:

  • Producer: This component publish messages to the Kafka cluster
  • Consumer: This component consume messages from the Kafka cluster
  • Message: This is the data which is sent by the producer to the cluster
  • Connection: The producer needs to establesh a TCP connection to publish message. Same is needed in consumer app to consume data from the Kafka cluster
  • Topic: A Topic is a logical grouping of similar messages. The producer app can publish message to a particular Topic and can consume from a specific Topic
  • Topic Partition: To scale a topic memory as it can contain a lot of messages, a single Topic is divided into partitions and each partition can live on any node in the cluster, following image shows how message is written into multiple partitions:

Topic partitioning in Kafka

4. Installing Kafka and Making a Topic

To download and install Kafka, we can refer to the Official Kafka guide provided here. When the Kafka server is up and running, we can create a new topic, named as javacodegeeks with the following command:

Creating a topic

We will see the following output once this command executes:

Kafka create topic

5. Making the Spring Boot Project with Maven

We will be using one of the many Maven archetypes to create a sample project for our example. To create the project execute the following command in a directory that you will use as workspace:

Creating a Project

If you are running maven for the first time, it will take a few seconds to accomplish the generate command because maven has to download all the required plugins and artifacts in order to make the generation task. Once we run this project, we will see following output and the project will be created:

Creating Kafka project

6. Adding Maven dependencies

Once you have created the project, feel free to open it in your favourite IDE. Next step is to add appropriate Maven Dependencies to the project. We will work with following dependencies in our project:

  • spring-boot-starter-web : This dependency marks this project as a Web project and it adds dependencies in order to create controllers and make Web-related classes.
  • spring-kafka : This is the dependency which brings all the Kafka related dependencies into the project classpath
  • spring-boot-starter-test : This dependency collects all test related JARs into the project like JUnit and Mockito.

Here is the pom.xml file with the appropriate dependencies added:

Find the latest Maven dependencies on Maven Central.

Finally, to understand all the JARs which are added to the project when we added this dependency, we can run a simple Maven command which allows us to see a complete Dependency Tree for a project when we add some dependencies to it. Here is a command which we can use:

Check Dependency Tree

When we run this command, it will show us the following Dependency Tree:

Noticed something? So many dependencies were added by just adding four dependencies to the project. Spring Boot collects all related dependencies itself and leave nothing for us in that matter. The biggest advantage is that all these dependencies are guranteed to be copatible with each other.

7. Project Structure

Before we move on and start working on the code for the project, let’s present here the project structure we will have once we’re finished adding all the code to the project:

We have divided the project into multiple packages so that the principle of separation of concern is followed and code remains modular.

8. Adding configuration

Before we can start writing code for our project, we need to provide some properties in the file of our Spring Boot project:

These are some properties which we will be using in our project as the topic and group ID for the message we will publish and consume. Also, 9092 is the default port for Apache Kafka. Note that we could have defined multiple topics here as well with different name for the keys.

9. Defining the Producer configuration

We will start by defining the configuration for a Producer. The only property we need to define mandatorily for a Kafka Producer is the address for Kafka server with its port.

Although above class definition is pretty simple, we still have some points which we need to understand:

  • @Configuration : This class is defined as a configuration class which means that this class will be picked up by Spring Boot automatically and all beans defined inside this class will be managed by Spring container automatically.
  • We defined a bean for ProducerFactory which takes input as various properties like the Kafka server address and other serialization properties which hels to code and decode the message sent through the Kafka producer bean.
  • Finally, we defined a bean for KafkaTemplate which is the actual API object which will be used to publish the message on a Kafka topic.

10. Defining the Consumer configuration

As we are making Kafka producer and consumer in the same app for demonstration purposes, we will also define a consumer config class which will just contain basic properties for a Kafka consumer. This class can be put inside any project which isn’t a producer and only a Kafka consumer as well without any changes. Let’s look at the configuration definition:

The configuration we provided is very similar to the producer configuration. The only difference to note here is:

  • We defined a ConsumerFactory class object bean which considers the Kafka server address and the consumer group ID as well which this Kafka consumer app belongs to. We have provided a unique String to the consumer as only a unique string is acceptable
  • Finally, we defined a ConcurrentKafkaListenerContainerFactory which makes sure that this consumer application can consume Kafka messages at a concurrent pace and can give a consistent throughput even when published messages are high in number.

11. Defining the Spring Boot class

At the final stage, we will make the Spring Boot class with which we can publish a message, consume the message on the same topic. Here is the class definition for the main class:

We have used an interface CommandLineRunner to make this class run code with which we can test the producer and config class code we wrote. In this class, we publish a message to the specified topic and listen for it in the listener method we defined in the same class.

In the next section, we will run our project with a simple Maven command.

12. Running the project

Now that the main class definition is done, we can run our project. Running the application is easy with maven, just use the following command:

Running the Project

Once we execute the above command, we will see that a message has been published on the specified topic and same app consumed the message as well:

Running Spring Boot Kafka project

13. Kafka Partitioning

As a final concept, we will touch upon how Topic partitioning is done in Apache Kafka. We will start with a very simple illustrative image which shows how leaders exist in a Topic partition:

When a write happens for a topic at a position for which Partition 0 in the Broker 0 is the leader, this data is replicated across the nodes so that the message remains safe. This means that message will be replicated across Partition 0 for all three brokers shown in the image above.

The replication process in Kafka is done in parallel by multiple threads being opened by the node. As the threads are opened to exploit parallelism as much as possible, a very high throughput system is achieved in Kafka. After a message is replicated a certain number of times, a write is called to be accepted but the replication of message continues until the replication factor is achieved for the data.

14. Conclusion

In this lesson, we looked at how easy and quick it is to construct a Spring Boot app with Apache Kafka integrated into it. Apache Kafka has grown from a simple Apache project to a production-grade project which can manage millions of request each second when deployed with the correct number of nodes in its cluster, assigned proper memory and managed correctly through Zookeeper. Apache Kafka is one of the most promising skill to have in a software engineer jobs and can cover many use cases like website-tracking, real-time messaging application and much more.

The scale at which Apache Kafka can manage messages on its topics and partitions is really humungous and the architecture route it takes to become so much scalable is inspiring to many other projects. The scailability and real-time processing speed it promises it offers makes sure that it solves many of your problems in projects which needs to scale very much.

15. Download the Source Code

This was an example of Apache Kafka integration with Spring Framework.

Apache Kafka Java Example(Producer + Consumer)

In this tutorial, we will be developing a sample apache kafka java application using maven. We will be configuring apache kafka and zookeeper in our local machine and create a test topic with multiple partitions in a kafka broker.We will have a separate consumer and producer defined in java that will produce message to the topic and also consume message from it.We will also take a look into how to produce messages to multiple partitions of a single topic and how those messages are consumed by consumer group.

First of all, let us get started with installing and configuring Apache Kafka on local system and create a simple topic with 1 partition and write java program for producer and consumer.The project will be a maven based project. After this, we will be creating another topic with multiple partitions and equivalent number of consumers in a consumer-group to balance the consuming between the partitions. Also, we will be having multiple java implementations of the different consumers.

Zookeeper Setup On windows

Assuming that you have jdk 8 installed already let us start with installing and configuring zookeeper on Windows.Download zookeeper from I have downloaded zookeeper version 3.4.10 as in the kafka lib directory, the existing version of zookeeper is 3.4.10.Once downloaded, follow following steps:

1. Extract it and in my case I have extracted kafka and zookeeper in following directory:

2. Once this is extracted, let us add zookeeper in the environment variables.For this go to Control PanelAll Control Panel ItemsSystem and click on the Advanced System Settings and then Environment Variables and then edit the system variables as below:

3. Also, edit the PATH variable and add new entry as %ZOOKEEPER_HOME%bin for zookeeper.

4. Rename file C:Dsoftwareskafka-newzookeeper-3.4.10zookeeper-3.4.10confzoo_sample.cfg to zoo.cfg

5. Now, in the command prompt, enter the command zkserver and the zookeeper is up and running on http://localhost:2181

Kafka Setup On windows

Head over to and download Scala 2.12. This version has scala and zookepper already included in it.Follow below steps to set up kafka.

1. Unzip the downloaded binary. In my case it is — C:Dsoftwareskafka_2.12-1.0.1

2. Go to folder C:Dsoftwareskafka_2.12-1.0.1config and edit

4. Now open a new terminal at C:Dsoftwareskafka_2.12-1.0.1.

5. Execute .binwindowskafka-server-start.bat to start Kafka. Since, we have not made any changes in the default configuration, Kafka should be up and running on http://localhost:9092

Let us create a topic with a name devglan-test

Above command will create a topic named devglan-test with single partition and hence with a replication-factor of 1. This will be a single node — single broker kafka cluster.

Now let us create a producer and consumer for this topic.

Producer can produce messages and consumer can consume messages in the following way from the terminal

Kafka Architecture

Producers are the data source that produces or streams data to the Kafka cluster whereas the consumers consume those data from the Kafka cluster.

Kafka cluster is a collection of no. of brokers and clients do not connect directly to brokers. Instead, clients connect to c-brokers which actually distributes the connection to the clients.

Kafka cluster has multiple brokers in it and each broker could be a separate machine in itself to provide multiple data backup and distribute the load.

Each Broker contains one or more different Kafka topics. For example, Broker 1 might contain 2 different topics as Topic 1 and Topic 2. Now each topic of a single broker will have partitions. This helps in replicated commit log service and provides resilience.

Kafka topics provide segregation between the messages produced by different producers. For example, the sales process is producing messages into a sales topic whereas the account process is producing messages on the account topic. Each topic partition is an ordered log of immutable messages

Anatomy of a Topic

Let us assume we have 3 partitions of a topic and each partition starts with an index 0. The write operation starts with the partition 0 and the same data is replicated in other remaining partitions of a topic. Now, the consumer can start consuming data from any one of the partitions from any desired offset.

Offset defines the location from where any consumer is reading a message from a partition.

As we saw above, each topic has multiple partitions. Now, let us see how these messages of each partition are consumed by the consumer group. A consumer group is a group of consumers and each consumer is mapped to a partition or partitions and the consumer can only consume messages from the assigned partition.

If there are 3 consumers in a consumer group, then in an ideal case there would be 3 partitions in a topic. But if there are 4 consumers but only 3 partitions are available then any one of the 4 consumer won’t be able to receive any message. We will see this implementation below:

If there are 2 consumers for a topic having 3 partitions, then rebalancing is done by Kafka out of the box.

Kafka Maven Dependency

We require kafka_2.12 artifact as a maven dependency in a java project. It has kafka-clients,zookeeper, zookepper client,scala included in it.

Kafka Producer in Java

A Kafka producer is instantiated by providing a set of key-value pairs as configuration.The complete details and explanation of different properties can be found here.Here, we are using default serializer called StringSerializer for key and value serialization.These serializer are used for converting objects to bytes.Similarly, devglan-test is the name of the broker.Finally block is must to avoid resource leaks.

It will send messages to the topic devglan-test.

Kafka Consumer in Java

A consumer is also instantiated by providing properties object as configuration.Similar to the StringSerialization in producer, we have StringDeserializer in consumer to convert bytes back to Object. is a must have property and here it is an arbitrary value.This value becomes important for kafka broker when we have a consumer group of a broker.With this group id, kafka broker ensures that the same message is not consumed more then once by a consumer group meaning a message can be only consumed by any one member a consumer group.

Following is a sample output of running By default, there is a single partition of a topic if unspecified.

Creating Topics with Multiple Partition

Now, we will be creating a topic having multiple partitions in it and then observe the behaviour of consumer and producer.As we have only one broker, we have a replication factor of 1 but we have have a partition of 3.

Now, it’s time to produce message in the topic devglan-partitions-topic . We can do it in 2 ways. Either producer can specify the partition in which it wants to send the message or let kafka broker to decide in which partition to put the messages. By default, kafka used Round Robin algo to decide which partition will be used to put the message.

Hence, as we will allow kafka broker to decide this, we don’t require to make any changes in our java producer code.

But since we have, 3 partitions let us create a consumer group having 3 consumers each having the same group id and consume the message from the above topic. Ideally we will make duplicate with name and and run each of them individually.

Now, start all the 3 consumers one by one and then the producer. You can see in the console that each consumer is assigned a particular partition and each consumer is reading messages of that particular partition only.


In this article, we discussed about setting up kafka in windows local machine and creating Kafka consumer and producer on Java using a maven project.You can share your feedback in the comment section below.

If You Appreciate This, You Can Consider:

  • Like us at: or follow us at
  • Share this article on social media or with your teammates.

About The Author

A technology savvy professional with an exceptional capacity to analyze, solve problems and multi-task. Technical expertise in highly scalable distributed systems, self-healing systems, and service-oriented architecture. Technical Skills: Java/J2EE, Spring, Hibernate, Reactive Programming, Microservices, Hystrix, Rest APIs, Java 8, Kafka, Kibana, Elasticsearch, etc.

apache-kafka Начало работы с apache-kafka


Kafka — это высокопроизводительная система обмена сообщениями для публикации-подписки, реализованная как распределенная, разделенная, реплицированная служба журнала фиксации.

Взято с официального сайта Кафки


Один брокер Kafka может обрабатывать сотни мегабайт от чтения и записи в секунду от тысяч клиентов.


Kafka спроектирован таким образом, чтобы один кластер мог служить центральной базой данных для крупной организации. Он может быть эластично и прозрачно расширен без простоя. Потоки данных разделяются и распределяются по кластеру машин, чтобы потоки данных превышали возможности любой отдельной машины и позволяли кластерам скоординированных потребителей


Сообщения сохраняются на диске и реплицируются внутри кластера, чтобы предотвратить потерю данных. Каждый брокер может обрабатывать терабайты сообщений без влияния на производительность.

Распространяется по дизайну

Kafka имеет современную кластер-ориентированную конструкцию, которая обеспечивает надежную прочность и отказоустойчивость.

Установка или настройка

Шаг 1 . Установка Java 7 или 8

Шаг 2 . Загрузите Apache Kafka по адресу:

Например, мы попробуем загрузить Apache Kafka

Шаг 3 . Извлеките сжатый файл.

В окне: щелкните правой кнопкой мыши -> Извлечь здесь

Шаг 4 . Запустить Zookeeper

Шаг 5 . Запустить сервер Kafka


Apache Kafka ™ — это распределенная потоковая платформа.

Что значит

1-Он позволяет публиковать и подписываться на потоки записей. В этом отношении он похож на очередь сообщений или корпоративную систему обмена сообщениями.

2-Он позволяет сохранять потоки записей отказоустойчивым способом.

3-Он позволяет обрабатывать потоки записей по мере их возникновения.

Он используется для двух широких классов приложений:

1-построение потоковых данных в реальном времени, которые надежно получают данные между системами или приложениями

2-потоковые приложения реального времени, которые преобразуют или реагируют на потоки данных

Скрипты консоли Kafka отличаются для платформ Unix и Windows. В примерах вам может потребоваться добавить расширение в соответствии с вашей платформой. Linux: скрипты, расположенные в bin/ с расширением .sh . Windows: скрипты, расположенные в binwindows и с расширением .bat .


Шаг 1. Загрузите код и разблокируйте его:

Шаг 2. Запустите сервер.

чтобы иметь возможность удалять темы позже, откройте и установите delete.topic.enable в true.

Kafka в значительной степени полагается на zookeeper, поэтому вам нужно сначала начать его. Если вы его не установили, вы можете использовать удобный сценарий, упакованный с помощью kafka, чтобы получить быстрый и грязный одноузловой экземпляр ZooKeeper.

Шаг 3: убедитесь, что все работает

Теперь у вас должен быть zookeeper, слушая localhost:2181 и один брокер kafka на localhost:6667 .

Создать тему

У нас только один брокер, поэтому мы создаем тему без коэффициента репликации и только один раздел:

Проверьте свою тему:

отправлять и получать сообщения

На другом терминале запустите производителя и отправьте несколько сообщений. По умолчанию инструмент отправляет каждую строку в виде отдельного сообщения для брокера без специальной кодировки. Напишите несколько строк и выйдите с помощью CTRL + D или CTRL + C:

Сообщения должны появляться в потребительском термине.

Стоп кафка

запустить кластер с несколькими брокерами

В приведенных выше примерах используется только один брокер. Чтобы настроить настоящий кластер, нам просто нужно запустить более одного сервера kafka. Они будут автоматически координировать себя.

Шаг 1. Чтобы избежать столкновения, мы создаем файл для каждого брокера и изменяем свойства конфигурации id , port и logfile .

Изменить свойства для каждого файла, например:

Шаг 2: запустите трех брокеров:

Создание реплицированной темы

На этот раз есть дополнительная информация:

  • «Лидер» — это узел, ответственный за все чтения и записи для данного раздела. Каждый узел будет лидером для случайно выбранной части разделов.
  • «replicas» — это список узлов, которые реплицируют журнал для этого раздела независимо от того, являются ли они лидером или даже если они в настоящее время живы.
  • «isr» — это набор реплик «in-sync». Это подмножество списка реплик, который в настоящее время жив и подхвачен лидеру.

Обратите внимание, что ранее созданный раздел остается неизменным.

проверка отказоустойчивости

Опубликуйте сообщение в новой теме:

Убейте лидера (1 в нашем примере). В Linux:

Посмотрите, что произошло:

Руководство переключилось на брокер 2 и «1» в несинхронный режим. Но сообщения все еще существуют (используйте потребитель, чтобы проверить самостоятельно).

Роль Apache Kafka в Big Data и DevOps: краткий ликбез и практические кейсы

Мы уже упоминали Apache Kafka в статье про промышленный интернет вещей (Industrial Internet Of Things, IIoT). Сегодня поговорим о том, где и для чего еще в Big Data проектах используется эта распределённая, горизонтально масштабируемая система обработки сообщений.

Как работает Apache Kafka

Apache Kafka позволяет в режиме онлайн обеспечить сбор и обработку следующих данных:

  • поведение пользователя на сайте;
  • потоки информации с множества конечных устройств IoT и IIoT («сырые данные»);
  • агрегация журналов работы приложений;
  • агрегация статистики из распределенных приложений для корпоративных витрин данных (ETL-хранилищ);
  • журналирование событий.

Яркий пример использования Apache Kafka – непрерывная передача информации со smart-периферии (конечных устройств) в IoT-платформу, когда данные не только передаются, но и обрабатываются множеством клиентов, которые называются подписчиками (consumers). В роли подписчиков выступают приложения и программные сервисы. Здесь имеют место отложенные вычисления, когда подписчиков меньше, чем сообщений от издателей – источников данных (producer). Сообщения (messages) записываются по разделам (partition) темы (topic) и хранятся в течении заданного периода. Подписчики сами опрашивают Kafka на предмет наличия новых сообщений, и указывают, какие записи им нужно прочесть, увеличивая или уменьшая смещение к нужной записи. Записанные события могут переигрываться или обрабатываться повторно [1].

Принцип действия распределенной Big Data системы обработки сообщений Apache Kafka

Зачем нужна Кафка в Big Data

Поскольку сообщения скапливаются в топике до их обработки подписчиками, Apache Kafka также называют брокером сообщений и средством для управления очередями в Big Data системах с высокой пропускной способностью сети (сотни тысяч сообщений в секунду). Однако, в отличие от RabbitMQ, другой популярной системы управления очередью сообщений, Apache Kafka является, прежде всего, распределенным реплицированным журналом фиксации изменений [2]. Чем еще отличаются эти брокеры сообщений, читайте в нашей новой статье. Именно с журналированием связаны ключевые сценарии использования Kafka (use-cases) и особенности программной реализации этой системы.

В частности, если необходимо сформировать общий журнал поведения всех пользователей приложения, Кафка поможет собрать и агрегировать логи каждого сеанса от каждого клиента в потоковом режиме (онлайн) [3]. Эта информация, в свою очередь, может использоваться в ETL-процессах (Extract, Transform, Load) для использования в дэшбордах систем интеллектуальной бизнес-аналитики (BI, Business Intelligence) [4].

Еще один типичный пример Apache Kafka в процессах обработки логов – это сбор и агрегация физических файлов журналов с серверов и помещение их в одном месте, например, в HDFS – файловой системе Apache Hadoop. При этом Кафка обеспечивает чистую абстракцию журнала или событий данных в потоке сообщений. Это значительно снижает задержку обработки Big Data, поддерживая горизонтальное масштабирование с множеством источников данных и распределенными потребителями [5].

Источники и получатели сообщений в Апач Кафка

Примеры применения Apache Kafka в Big Data системах

Итак, Кафка позволяет централизовать сбор, передачу и обработку большого количества сообщений в непрерывных информационных потоках, а также хранить эти большие данные, не волнуясь о рисках их потери и производительности системы. С архитектурной точки зрения Apache Kafka может выполнять роль связующего элемента событийно-ориентированной Big Data системы, обеспечивающем взаимодействие отделенных друг от друга микросервисов [1]. Этот факт обусловливает необходимость владения навыками администрирования Apache Kafka для современного DevOps-специалиста, архитектора корпоративной модели данных и инженера Big Data.

С учетом распространения технологий Big Data и цифровизации различных отраслей экономики, в популярность Kafka ИТ-мире растет стремительным образом. Разумеется, наиболее часто эта распределённая, горизонтально масштабируемая система обработки сообщений востребована в организациях, работающих с большими данными.

К примеру, изначально созданный компаний LinkedIn в 2011 году для своих внутренних нужд, Apache Кафка используется этой соцсетью для потоковой передачи данных о деятельности и операционных показателях приложений (LinkedIn News Feed, LinkedIn Today и пр.). Разработчик Big Data систем социальной аналитики, американская компания DataSift, применяет Кафка в качестве коллектора для мониторинга событий и трекера потребления потоков данных пользователями в режиме реального времени. В Twitter Kafka является частью инфраструктуры потоковой обработки.

Foursquare, социальная сеть с функцией геопозиционирования для работы с мобильными устройствами, в т.ч. сотовых телефонов без GPS, использует Apache Kafka для передачи сообщений между онлайн и офлайн-системами, а также для интеграции средств мониторинга в свою Big Data инфраструктуру на базе Hadoop. В корпорации IBM Кафка применяется для обмена сообщениями между микросервисами, обработки событийных и потоковых данных в системах аналитики [5].

В России основными предприятиями, использующими Apache Кафка, являются высокотехнологичные компании финансового сектора, ИТ и телеком: Сбербанк, Тинькофф, Альфа-Банк, Вымпелком, МТС, Ростелеком и прочие организации, работающие с Big Data и Internet of Things. Как именно используется этот брокер сообщений в системах машинного обучения, читайте в нашей следующей статье. А как Кафка используется вместе с другим популярным в мире Big Data DevOps-инструментом, системой автоматизированного управления контейниризованными приложениями, Kubernetes, читайте здесь.

Apache Кафка как центральное звено Big Data и IoT-системы

Станьте востребованным специалистом по Apache Кафка на практических курсах Администрирование кластера Kafka в лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов для руководителей, аналитиков, архитекторов, инженеров и исследователей Big Data в Москве.

Kafka Java Client¶

Confluent Platform includes the Java producer and consumer shipped with Apache KafkaВ®.

Java Client installation¶

All JARs included in the packages are also available in the Confluent Maven repository. Here’s a sample POM file showing how to add this repository:

The Confluent Maven repository includes compiled versions of Kafka.

To reference the Kafka version 2.4 that is included with Confluent Platform 5.4.1, use the following in your pom.xml :

Version names of Apache Kafka vs. Kafka in Confluent Platform: Confluent always contributes patches back to the Apache KafkaВ® open source project. However, the exact versions (and version names) being included in Confluent Platform may differ from the Apache artifacts when Confluent Platform and Kafka releases do not align. If they are different, Confluent keeps the groupId and artifactId identical, but appends the suffix -ccs to the version identifier of the Confluent Platform version to distinguish these from the Apache artifacts.

You can reference artifacts for all Java libraries that are included with Confluent Platform. For example, to use Confluent Platform serializers that integrate with the rest of the Confluent Platform you would include the following in your pom.xml :

Kafka Producer¶


The Java producer is constructed with a standard Properties file.

Configuration errors will result in a raised KafkaException from the constructor of KafkaProducer .

Asynchronous writes¶

The Java producer includes a send() API which returns a future which can be polled to get the result of the send.

This producer example shows how to invoke some code after the write has completed you can also provide a callback. In Java this is implemented as a Callback object:

In the Java implementation you should avoid doing any expensive work in this callback since it is executed in the producer’s IO thread.

Synchronous writes¶

Kafka Consumer¶


The Java consumer is constructed with a standard Properties file.

Configuration errors will result in a KafkaException raised from the constructor of KafkaConsumer .

Basic usage¶

The Java client is designed around an event loop which is driven by the poll() API. This design is motivated by the UNIX select and poll system calls. A basic consumption loop with the Java API usually takes the following form:

There is no background thread in the Java consumer. The API depends on calls to poll() to drive all of its IO including:

  • Joining the consumer group and handling partition rebalances.
  • Sending periodic heartbeats if part of an active generation.
  • Sending periodic offset commits (if autocommit is enabled).
  • Sending and receiving fetch requests for assigned partitions.

Due to this single-threaded model, no heartbeats can be sent while the application is handling the records returned from a call to poll() . This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie.

This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. The max.poll.records configuration option places an upper bound on the number of records returned from each call. You should use both poll() and max.poll.records with a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable.

If you fail to tune these settings appropriately, the consequence is typically a CommitFailedException raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it’s occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic.

Java Client code examples¶

Basic poll loop¶

The consumer API is centered around the poll() method, which is used to retrieve records from the brokers. The subscribe() method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to subscribe() to setup the topics of interest and then a loop which calls poll() until the application is shut down.

The consumer intentionally avoids a specific threading model. It is not safe for multi-threaded access and it has no background threads of its own. In particular, this means that all IO occurs in the thread calling poll() . In the consumer example below, the poll loop is wrapped in a Runnable which makes it easy to use with an ExecutorService .

The poll timeout is hard-coded to 500 milliseconds. If no records are received before this timeout expires, then poll() will return an empty record set. It’s not a bad idea to add a shortcut check for this case if your message processing involves any setup overhead.

To shut down the consumer, a flag is added which is checked on each loop iteration. After shutdown is triggered, the consumer will wait at most 500 milliseconds (plus the message processing time) before shutting down since it might be triggered while it is in poll() . A better approach is provided in the next example.

Note that you should always call close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. Latch is added to this example to ensure that the consumer has time to finish closing before finishing shutdown.

Shutdown with wakeup¶

An alternative pattern for the poll loop in the Java consumer is to use Long.MAX_VALUE for the timeout. To break from the loop, you can use the consumer’s wakeup() method from a separate thread. This will raise a WakeupException from the thread blocking in poll() . If the thread is not currently blocking, then this will wakeup the next poll invocation.

Synchronous commits¶

The simplest and most reliable way to manually commit offsets is using a synchronous commit with commitSync() . As its name suggests, this method blocks until the commit has completed successfully.

In this example, a try/catch block is added around the call to commitSync . The CommitFailedException is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.

First you can adjust the setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.

The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread anway). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations.

For now, you should set large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shut down with close() ). This should be rare in practice.

You should be careful in this example since the wakeup() might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.

Delivery guarantees¶

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery. But you must be a little careful with the commit failure, so you should change doCommitSync to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException in the synchronous commit.

Correct offset management is crucial because it affects delivery semantics .

Рейтинг статьи
Читайте так же:
Bufferedreader в java
Ссылка на основную публикацию