автордың кітабынан сөз тіркестері Kafka Streams в действии. Приложения и микросервисы для работы в реальном времени
В Kafka Streams существует три типа окон:
• сеансовые;
• «кувыркающиеся» (tumbling);
• скользящие/«прыгающие» (sliding/hopping).
Какое выбрать — зависит от бизнес-требований. «Кувыркающиеся» и «прыгающие» окна ограничиваются по времени, в то время как ограничения сеансовых связаны с действиями пользователей — длительность сеанса (-ов) определяется исключительно тем, насколько активно ведет себя пользователь. Главное — не забывать, что все типы окон основываются на метках даты/времени записей, а не на системном времени.
я парадигмы MapReduce
Функции отображения (map) и свертки (reduce) не были чем-то новым на момент разработки компанией Google MapReduce. Уникальны
Метки даты/времени в Kafka Streams
В подразделе 2.4.4 мы обсуждали метки даты/времени в записях Kafka. В этом разделе мы поговорим про использование меток даты/времени в Kafka Streams. Метки даты/времени играют важную роль в следующих ключевых аспектах функциональности Kafka Streams:
• в соединении потоков данных;
• обновлении журналов изменений (API KTable);
Для выполнения соединения в Kafka Streams необходимо, чтобы все его участники были совместно секционированы, то есть у них было одинаковое число секций и ключи одного типа. В результате при вызове метода join() в листинге 4.13 оба экземпляра KStream проверяются на предмет необходимости повторного секционирования.
Примечание
Участвующие в соединении экземпляры интерфейса GlobalKTable повторного секционирования не требуют.
В подразделе 4.4.2 мы применяли к transactionStream метод selectKey() и сразу после этого осуществляли ветвление по возвращаемым объектам KStream. А поскольку метод selectKey() изменяет ключи, то как объект coffeeStream, так и electronicsStream требуют повторного секционирования. Имеет смысл еще раз сказать, что повторное секционирование требуется для того, чтобы гарантировать попадание записей с идентичными ключами в одну секцию. Повторное секционирование выполняется автоматически. Кроме того, при запуске приложения Kafka Streams проверяется, одинаковое ли число секций в участвующих в соединении топиках, и при обнаружении расхождений генерирует исключение TopologyBuilderException. За то, чтобы участвующие в соединении ключи были одного типа, отвечает разработчик.
Совместное секционирование также требует, чтобы все генераторы Kafka использовали один и тот же класс секционирования при записи в топики-источники Kafka Streams. Аналогично вы должны применять один и тот же объект StreamPartitioner при любых операциях записи в топики-стоки Kafka Streams посредством метода KStream.to(). Если же вы придерживаетесь стратегий секционирования по умолчанию, то можете о них не волноваться.
Продолжим разговор о соединениях и посмотрим на остальные их варианты.
На рис. 4.14 показано, как выглядит обновленная топология обработки с учетом листинга 4.11. Как вы уже видели, при изменении ключа может потребоваться повторное секционирование данных. Это справедливо и для настоящего примера. Почему же на рисунке нет шага повторного секционирования?
Хотя Kafka Streams предоставляет очень широкие возможности настройки и множество свойств, которые можно менять под свои нужды, в нашем первом примере будут использоваться только два параметра конфигурации — APPLICATION_ID_CONFIG и BOOTSTRAP_SERVERS_CONFIG:
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "yelling_app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Примечание
Передавать объекты Serde объектам Consumed и Produced вовсе не обязательно. Если этого не сделать, приложение будет использовать указанный в конфигурации сериализатор/десериализатор. Кроме того, с помощью классов Consumed и Produced можно задать объект Serde только для ключа или только для значения.
В подразделе 2.5.5 я описал использование пула потоков и подписку нескольких потребителей (из одной группы) на одни и те же топики. Хотя Kafka и выравнивает нагрузку по топикам/секциям между всеми потребителями, назначение топиков и секций носит недетерминистский характер: нельзя предугадать, какие пары топиков/секций получит каждый из потребителей.
В классе KafkaConsumer имеются методы, с помощью которых можно подписаться на конкретные топик и секцию:
TopicPartition fooTopicPartition_0 = new TopicPartition("foo", 0);
TopicPartition barTopicPartition_0 = new TopicPartition("bar", 0);
consumer.assign(Arrays.asList(fooTopicPartition_0, barTopicPartition_0));
У назначения топиков/секций вручную есть свои недостатки:
• в случае отказа одной из машин не произойдет автоматического переназначения секций топиков, даже для потребителей с одним идентификатором группы. Для изменения назначений понадобится выполнить еще один вызов метода consumer.assign;
• для фиксации используется группа, указанная в настройках потребителя, но, поскольку все потребители будут функционировать сами по себе, имеет смысл задать для каждого из них свой уникальный идентификатор группы.
Вышеупомянутый метод Producer.send принимает в качестве параметра экземпляр типа Callback. После подтверждения получения записи ведущим брокером генератор инициирует выполнение метода Callback.onComplete. Только один из аргументов метода Callback.onComplete будет непустым. В данном случае нас интересует только вывод трассы вызовов при ошибке, так что нам важно, чтобы объект исключения был не пуст. Возвращаемый фьючерс после подтверждения получения записи сервером выдает объект типа RecordMetadata.
определение
В листинге 2.3 метод Producer.send возвращает объект Future, который представляет собой результат выполнения асинхронной операции. Что важнее, Future дает возможность извлечь результаты асинхронных операций отложенным образом, вместо того чтобы ждать их завершения. Более подробную информацию о фьючерсах вы можете найти в документации Java, в разделе «Интерфейс Future<V>»: http:// mng.bz/0JK2.
Генераторы Kafka являются потокобезопасными. Отправка данных в Kafka производится асинхронно — возврат из метода Producer.send происходит сразу же после помещения генератором записи во внутренний буфер. Этот буфер отправляет записи пакетами. В зависимости от ваших настроек при отправке сообщения при заполненном буфере генератора вы можете столкнуться с блокировкой.
Листинг 2.3. Пример простого генератора
