Apache Kafka с примером на Java и Spring Boot

Apache Kafka с примером Java Spring Boot

Сегодня я хочу рассказать об Apache Kafka в качестве брокера сообщений. С помощью Kafka мы попытаемся коммуницировать несколько Java Spring Boot приложений.

Начнем с самого простого: что такое Apache Kafka? На сайте википедии сказано что это фреймворк, реализующий программную шину с использованием потоковой обработки; программная платформа с открытым исходным кодом, разработанная Apache Software Foundation, написанная на Scala и Java. Согласен. Понятного мало. Особенно для тех, кто вообще не вкурсе что такое потоковая обработка, шина, брокеры сообщений и т.д.

Если Вы сталкивались с REST или SOAP, то знаете что сервисы могут обмениваться сообщениями. Например один сервис отвечает за авторизацию пользователей, а второй за обработку заказа. Под сервисом я имею в виду отдельное приложение (не важно на Java или другом языке программирования), которое имеет API при вызове которого получают определенный результат. Возможно Вы еще с таким явлением не сталкивались. Важно понять что современные приложения очень редко бывают автономны. Особенно большие веб проекты. Возьмем к примеру ваш любимый банк и его сайт, где Вы можете совершать платежи и другую банковскую активность.

Наверняка он общается со сторонними сервисами не только внутри банка но и по Интернету. Котировки для отображения курса валют могут быть на сайте регулятора. Веб приложению банка нужно сделать запрос чтобы получить актуальные данные на момент операции. В популярных банковских веб приложениях есть возможность заплатить коммуналку. Когда Вы совершаете платеж, приложение делает запрос на сервер коммунальной службы чтобы получить Вашу задолженность и потом перевести средства на нужный счет. После совершения платежа приложение обращается к сервису по отправке уведомлений или СМС, чтобы уведомить Вас о совершенной транзакции.

Пример выше является очень абстрактным но я хочу донести мысль, что приложения постоянно обмениваются информацией между собой. Нам это не видно за оболочкой веб браузера или мобильного приложение. Все происходит на сервере. Но автономных приложений в нашем современном мире почти не осталось.

Более того,  не только веб сервера общаются между собой, но также и другие устройства генерируют события, которые сервера должны обрабатывать. Сейчас даже Ваш холодильник если он более-менее современный пытается куда-то отослать аналитику и статистику на сервер.

В тренде микросервисные приложения, где большой код разбивается на меньшие независимые веб сервисы, которые потом обмениваются информацией при необходимости. Если раньше Вы вызывали метод класса для определенной операции, то сейчас также вызывается API другого сервиса для вызова определенной функции или передачи данных.

Теперь, когда мы разобрались с микросервисами и коммуникацией между серверами, нетрудно понять что обмен сообщениями — задача не из простых. Нужно сформировать запрос, знать URL, получить разрешение чтобы вызывать сервис (как правило API защищен от несанкционированного доступа), отправить запрос, получить ответ, прочитать ответ. И это я только описал так называемый happy flow. Когда все идет хорошо. А ведь бывает и негативный сценарий. Что если сервис который мы вызываем не отвечает? Или у него поменялся ответ? Или что, если один сервис генерирует запросы быстрее чем другой может их обработать? Все эти вещи достаточно сложны и обсуждать их в текущей статье мы не будем. Мы рассмотрим только один кейс из недостатков синхронного общения, чтобы потом плавно перейти в преимущества асинхронного общения, которое возможно с помощью брокеров сообщений, коим является Kafka.

Допустим у Вас есть проект по продажи товаров. Пусть это будет интернет магазин обуви. Он очень большой и популярный. Проект настолько обширный что программный код разделен сразу на несколько подпроектов, чтобы разные команды могли с ним работать не мешая друг другу. Это что-то схожее с микросервисами. У Вас есть главный сервер который принимает заказы. Есть второй сервер, который отправляет СМС уведомления пользователю что его заказ принят. Достаточно простой сценарий: принимаем запрос о покупке, сохраняем заказ, делаем запрос на сервер уведомлений чтобы отправить СМС и возвращаем результат что покупка обработана. Сейчас предположим что оба наши сервера это Spring Boot приложения написанные на 11 java.

Когда мы «серфим» по веб ресурсам — нам очень важна скорость с которой страницы открываются. Согласитесь, очень неприятно ждать пока страница загрузится, пока все картинки станут четкими и тд. Когда наш пользователь в примере выше будет делать запрос чтобы оформить свой заказ, он наверняка захочет чтобы все прошло как можно быстрее. Если мы будем делать обычный синхронный запрос на сервер уведомлений, то в коде приложения нам нужно будет дождаться ответа что запрос прошел и только после этого возвращать пользователю результат.

А что если отправку уведомлений сделать асинхронной? Можно обрабатывать заказ и в это же время сделать запрос на СМС. Дальше не дожидаясь ответа можно продолжить выполнение кода. Таким образом обработка заказа пользователя будет ускорена за счет того, что не нужно ждать ответа от сервера уведомлений.

Вот как раз Apache Kafka может нам помочь с вышеописанным сценарием. Можно положить инструкцию по отправке уведомлений в брокер сообщения и потом уже приложение которое настроено на прослушку темы с сообщением сможет получить и обработать наши инструкции.

Нам не нужно беспокоится что сообщение будет потеряно и не обработано. Даже если сервер, который отвечает за отправку сообщений перегружен — он сможет приступить к обработке наших инструкций немного позже нашей отправки. Кафка гарантирует что сообщение не потеряется и будет доставлено.

Если заглянуть немного глубже за интерфейс Apache Kafka то можно увидеть, что это некая форма лог файла. Есть источник данных, который называется продюсер (producer). Это не тот продюсер, который снимает фильмы или раскручивает певцов.

В данном случае продюсером мы называем того, кто будет генерировать трафик. Producer записывает сообщения (messages) в так называемый топик (topic). Если угодно другой перевод — тема. Можно воспринимать топик как название файла на компьютере. Сообщения хранятся в лог файле определенный период, который можно настраивать. В отличие от других брокеров сообщений (таких как ActiveMQ), Кафка может хранить сообщение даже после его обработки. Таким образом этот инструмент иногда используют в качестве базы данных.

Подписчики (consumer), которые слушают определенную тему опрашивают ее на наличие новых сообщений и потом обрабатывают их по своему усмотрению. Причем в темы может быть много подписчиков и много продюсеров.

Мы пишем в файл сообщения и потом можем их прочитать. Этот файл мы можем программно начать читать не сначала, а с определенного места (offset), который тоже можно настраивать.

Мы не будем подробно останавливать наше внимание на репликации данных в Apache Kafka, настройке топиков, продюсеров, подписчиков и других девопс нюансах, которые также нужно знать и девелоперам. Все это усложнит и так непростую тему. Сейчас наша задача ознакомится с этим инструментом и попробовать выполнить простой пример, который и будет Вашей отправной точкой в более детальном изучении Apache Kafka

Первое чта нам нужно — установить Apache Kafka на свой компьютер. Сделать это достаточно просто: переходим на официальный сайт и выполняем инструкции согласно своей операционной системе. На момент написания этой статьи дополнительно требуется установить ZooKeeper. Это специальный дополнительный сервис по обслуживанию несколько реплик брокера. Сейчас углубляться в эту тему пока не будем. Тем более что, разработчики Кафка обещают что скоро устанавливать зукипер будет не нужно.

После установки можете попробовать выполнить несколько упражнений, которые на официальном сайте по ссылке выше. Таким образом Вы сможете поработать с Кафка из коммандной строки.

Дальше, нам нужно — несколько Spring Boot приложений. Первое будет принимать наш REST запрос и отправлять сообщение в тему брокера. Второе приложение будет принимать сообщения и обрабатывать их. В нашем случае мы просто будем выводить сообщение в консоль.

Я генерирую Spring Boot приложения на сайте start.spring.io и добавляю зависимости для Kafka и Web:

Финальный pom.xml моего kafka-tutorial-producer и kafka-tutorial-producer одинаковый:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.3.3.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.java-master</groupId>
	<artifactId>kafka-tutorial-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafka-tutorial-producer</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>11</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Если не знаете, что такое pom.xml файл — ознакомьтесь со статьей о Мавен.

Чтобы оба приложения можно было запустить одновременно нужно хотя бы одному поменять стандартный порт томкет с 8080 на любой другой. Делается это в application.properties файле. В данно случае я выбрал консумера и поменял ему порт на 8081: server.port=8081

Дальше пишем простой класс с методом produce, который будет ложить сообщения в Apache Kafka тему. Чтобы не изобретать велосипед мы воспользуемся классом KafkaTemplate, который идет со спринг кафка библиотекой. Это параметризированный класс, которому нужно задать тип ключа и значения, которое мы будем передавать. Можно конечно передавать строки, но также можно и сложные объекты, которые состоят из нескольких полей.

Предлагаю создать какой-то несложный класс Message.

package com.javamaster.model;

public class Message {

    private String message;
    private Integer age;

    public Message() {
    }

    public Message(String message, Integer age) {
        this.message = message;
        this.age = age;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Message{" +
                "message='" + message + '\'' +
                ", age=" + age +
                '}';
    }
}

Типичный класс для хранения данных. Можете попробовать добавить больше полей. Это не преграда для кафка.

После того как мы внедрили зависимость KafkaTemplate в наш сервис, можно вызывать методы этого класса. В данном случае нам нужен только один: send(тема, сообщение). Как я уже писал выше — тема это что-то вроде имя файла на компьютере. Продюсер добавляет сообщения в тему, а подписчик зная имя темы может прослушивать ее на наличие новых сообщений.

В результате наш сервис имеет вот такой вид:

package com.javamaster.producer;

import com.javamaster.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {

    @Autowired
    private KafkaTemplate<String, Message> kafkaTemplate;

    public void produce(Message message) {
        System.out.println("Producing the message: " + message);
        kafkaTemplate.send("messages", message);
    }
}

Я не хотел просто по дефолту генерировать сообщения. Мы сейчас подлючим наш ProducerService в контроллер и будем вызывать его из метода контроллера. Сообщение мы будем передавать в запросе контроллер метода:

package com.javamaster.controller;

import com.javamaster.model.Message;
import com.javamaster.producer.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HomeController {

    @Autowired
    private ProducerService producerService;

    @GetMapping("/generate")
    public String generate(@RequestParam String message, @RequestParam Integer age) {
        producerService.produce(new Message(message, age));
        return "OK";
    }
}

Код выше — просто REST контроллер. Для тех кто не в курсе — я настоятельно советую почитать: Spring REST с примером

В проперти файл application.properties нужно указать адрес и порт Apache Kafka и прописать класс сериализации для ключа и значения. Так как мы используем для ключа строку а для значения класс Message то в Kafka наш класс будет передан в виде JSON объекта. Можно конечно выбрать и другой формат сериализации. Главное чтобы подписчик использовал тот же формат десериализации. Иначе сообщение невозможно будет прочитать.

Не забываем также указать Spring Boot хост и порт Apache Kafka сервера. Так как мы все запустили у себя на компьютере,то нужно указать localhost и стандартный кафка порт, если Вы его конечно не поменяли: 9092.

В результате, мой файл application.properties продюсер сервиса выглядит следующим образом:

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Дальше очередь сервиса подписчика.

Здесь все будет еще проще. Создаем аналогичный класс сообщений как и в сервисе продюсера. Дальше создаем класс ConsumerService и в нем метод consume. Чтобы метод начал слушать тему на него навешиваем аннотацию @KafkaListener, в которой указываем название темы и айди группы (group_id). Айди группы это идентификатор группы к которой принадлежит подписчик. Как я уже писал выше, у темы может быть несколько продюсеров и подписчиков одновременно. Их можно объединять в группы. Таким образом разные группы подписчиков могут парралельно обрабатывать тему с разных ее мест.

Наш класс подписчика имеет следующий вид:

package com.javamaster.consumer;

import com.javamaster.model.Message;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ConsumerService {

    @KafkaListener(topics = "messages", groupId = "message_group_id")
    public void consume(Message message){
        System.out.println("Consuming the message: " + message);
    }
}

Не забываем добавить десериализатор для ключа и значения в файл настроек:

server.port=8081
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

Спринг ругался на то что пакет где находился мой класс Message не вызывает доверия. Поэтому пришлось добавить ему spring.kafka.consumer.properties.spring.json.trusted.packages=* чтобы он все пакеты пометил как трастед для json.

Я настройках сервиса консумера я намеренно не добавлял указания для порта и хоста. На самом деле, Spring Boot имеет дефолтные настройки для Кафка: localhost:9092. Если ему не передавать других настроек — он будет брать значения по умолчанию. В примере с продюсером я просто хотел продемонстрировать как эти параметры можно передать. Но в нашем случае spring.kafka.producer.bootstrap-servers=localhost:9092 можно было не писать.

На этом собственно разработка и закончена.

Не забываем запустить Apache Kafka сервер. Стартуем наши 2 приложения и пытаемся в Postman или браузере вызвать эндпоинт продюсера:

В консоли приложений Вы сможете увидеть что продюсер передал сообщение в Apache Kafka, а подписчик успешно его принял и обработал:

Apache Kafka и Spring Boot

Это все, что я хотел показать на примере Spring Boot и брокера сообщений Apache Kafka. Как видите, настроить асинхронный обмен сообщениями между современными веб приложениями не так уж и сложно.

Я записал видео для этого туториала на английском языке. Приятного просмотра!

Понравилась статья? Поделиться с друзьями:
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: