Kafka стало проще с Flask

Что такое Кафка?

Apache Kafka - это отказоустойчивая платформа потоковой передачи событий. При потоковой передаче событий данные собираются в реальном времени и из разных источников, это могут быть данные веб-аналитики, данные термостата или даже база данных. Наряду со сбором данных существует множество ресурсов, предоставляемых вместе с Kafka для управления данными и их обработки, а также для эффективного разделения ресурсов, приоритизируя различные критически важные процессы и умеренно эффективные процессы. Короче говоря, это Кафка и потоковая передача событий. Чтобы узнать больше о Kafka, посмотрите это видео.

Какую пользу / помощь принесет Kafka при помощи выводов потоковой модели?

Большинство моделей глубокого обучения развертываются через Flask через вызовы REST API. Позже, чтобы развернуть его с помощью сервера; разработчики используют серверы типа gunicorn и uvicorn с разным количеством рабочих и потоков. Это нормально, пока вы не делаете только вывод, когда у вас есть только одна модель, и не требуется много времени для вывода. Но если есть выходы из комбинаций разных моделей или даже если после логического вывода есть несколько шагов, лучше иметь конвейер потоковой обработки.

Например, пользователь хочет обобщить всю книгу, пожелания и электронное письмо, а также уведомление на платформе после завершения процесса, если вы будете придерживаться подхода REST API, сначала вам нужно будет обработать запрос на вывод, а затем отправьте электронное письмо, а затем передайте его через контроллер уведомлений, чтобы отправить уведомление. Если вы достаточно умен, вы бы сказали, что я могу использовать что-то вроде ray для параллельного запуска электронной почты и части уведомлений. Да, можно, но уверены ли вы, что ray отказоустойчив? Так что в подобных задачах можно использовать силу Кафки.

Как показано на схеме ниже, у вас может быть вызов REST API для передачи данных для вывода; тогда флеш-сервер поместит данные в очередь / тему, а потребитель логического вывода будет продолжать получать данные из этой темы. Потребитель сделает вывод, а затем отправит необходимые данные поставщику электронной почты и поставщику уведомлений, и эти производители снова поместят эту информацию и данные в две разные темы: тему электронной почты и тему уведомления. Теперь потребитель электронной почты и получатель уведомлений будут брать информацию из своей соответствующей темы / очереди, выполнять необходимую обработку и отправлять электронное письмо и уведомление соответственно. Чтобы сохранить информацию в неизменном виде и избежать потери информации, вы можете использовать разные факторы репликации, а чтобы сделать конвейер более гибким и надежным, вы можете иметь группы потребителей и несколько тематических разделов.

В этом блоге мы не будем рассматривать группы потребителей и тематические разделы, а просто рассмотрим простой подход Kafka Consumer and Producer в Python через flask.

Чтобы скачать и настроить Kafka, следуя инструкциям на этой странице

Установите необходимые библиотеки

pip install kafka-python flask flask_cors

Создание темы Kafka

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>
  • Создание темы вывода
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic INFERENCE
  • Создание темы электронного письма
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic EMAIL
  • Создание темы уведомления
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic NOTIFICATION

Создание сервера Flask

  • Создайте файл app.py и добавьте следующие
# app.py

from flask import Flask, request, jsonify
import json
from flask_cors import CORS
from kafka import KafkaConsumer, KafkaProducer
app = Flask(__name__)
TOPIC_NAME = "INFERENCE"
KAFKA_SERVER = "localhost:9092"

producer = KafkaProducer(
    bootstrap_servers = KAFKA_SERVER,
    api_version = (0, 11, 15)
)

@app.route('/kafka/pushToConsumers', methods=['POST'])
def kafkaProducer():
		
    req = request.get_json()
    json_payload = json.dumps(req)
    json_payload = str.encode(json_payload)
		# push data into INFERENCE TOPIC
    producer.send(TOPIC_NAME, json_payload)
    producer.flush()
    print("Sent to consumer")
    return jsonify({
        "message": "You will receive an email in a short while with the plot", 
        "status": "Pass"})

if __name__ == "__main__":
    app.run(debug=True, port = 5000)

Обратите внимание, что Kafka хранит данные только в байтовой форме. Итак, какие бы данные у вас ни были: изображение, текст, звук и т. Д., Они должны быть сначала преобразованы в байты, а затем переданы производителю.

Потребитель вывода Kafka

  • Создайте файл inference_consumer.py и добавьте следующий
# inference_consumer.py

from kafka import KafkaConsumer, KafkaProducer
import os
import json
import uuid
from concurrent.futures import ThreadPoolExecutor

TOPIC_NAME = "INFERENCE"

KAFKA_SERVER = "localhost:9092"

NOTIFICATION_TOPIC = "NOTIFICATION"
EMAIL_TOPIC = "EMAIL"

consumer = KafkaConsumer(
    TOPIC_NAME,
    # to deserialize kafka.producer.object into dict
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)

producer = KafkaProducer(
    bootstrap_servers = KAFKA_SERVER,
    api_version = (0, 11, 15)
)

def inferencProcessFunction(data):
	. . . . .
	. . . . .
	. . . . .
	# process steps
	. . . . .
	. . . . .
  
	notification_data = {...}
	email_data = {...}
  producer.send(NOTIFICATION_TOPIC, notification_data)
	producer.flush()
	producer.send(EMAIL_TOPIC, email_data)
	producer.flush()

for inf in consumer:
	
	inf_data = inf.value
  
        inferencProcessFunction(inf_data)

Потребитель электронной почты Kafka

  • Создайте файл email_consumer.py и добавьте следующий
# email_consumer.py
from kafka import KafkaConsumer, KafkaProducer
import os
import json
import uuid
from concurrent.futures import ThreadPoolExecutor
TOPIC_NAME = "EMAIL"
consumer = KafkaConsumer(
    TOPIC_NAME,
    # to deserialize kafka.producer.object into dict
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
def sendEmail(data):
	. . . . .
	. . . . .
	. . . . .
	# process steps
	. . . . .
	. . . . .
for email in consumer:
	
	email_data = email.value
	
	sendEmail(email_data)

Потребитель уведомлений Kafka

  • Создайте файл notification_consumer.py и добавьте следующий
# notification_consumer.py
from kafka import KafkaConsumer, KafkaProducer
import os
import json
import uuid
from concurrent.futures import ThreadPoolExecutor
TOPIC_NAME = "NOTIFICATION"
consumer = KafkaConsumer(
    TOPIC_NAME,
    # to deserialize kafka.producer.object into dict
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
def sendNotification(data):
	. . . . .
	. . . . .
	. . . . .
	# process steps
	. . . . .
	. . . . .
for notification in consumer:
	
	notification_data = email.value
	
	sendNotification(notification_data)

Как параллельно запускать потребительские процессы без групп потребителей?

Чтобы запустить потребительские процессы параллельно без групп потребителей, вы можете обернуть цикл потребителя for с помощью ThreadPoolExecutor, как показано ниже.

with ThreadPoolExecutor(4) as tpool:

	for email in consumer:

		email_data = email.value

		future = tpool.submit(sendEmail, email_data)

Как запустить этот конвейер?

Здесь у нас будет три разных потребителя и один сервер для обслуживания вызовов REST. Таким образом, вам нужно будет открыть четыре окна терминала и выполнить следующие команды для запуска каждого сценария.

  • Запуск сервера с flask
# folder where app.py file is
python app.py
  • Или s запуск сервера с gunicorn
# install gunicorn
pip install gunicorn
# folder where app.py file is 
gunicorn -k gthread -w 2 -t 40000 --threads 3 -b:5000 app:app
  • Запуск потребителя вывода (новое окно терминала)
# folder where inference_consumer.py file is
python inference_consumer.py
  • Начальный потребитель электронной почты (новое окно терминала)
# folder where email_consumer.py file is 
python email_consumer.py
  • Запуск потребителя уведомлений (новое окно терминала)
# folder where notification_consumer.py file is 
python notification_consumer.py

Теперь вы можете попробовать отправить запрос на свой сервер, и вы получите ответ со статусом, и ваш вывод будет поставлен в очередь, и вы вскоре получите электронное письмо и уведомление в зависимости от шагов вашего процесса.

В итоге

В этом блоге мы увидели преимущества использования системы потоковой передачи событий вместо традиционных вызовов API и ожидания ответа. Мы также рассмотрели и рассмотрели структуру практических примеров того, как можно использовать Kafka с их моделями вывода глубокого обучения для потокового вывода и других этапов постобработки и сделать процесс гибким, надежным и простым, разделив работу между разными потребителями. Надеюсь, вам понравилось это читать, и вы сможете использовать эту структуру для некоторых своих реализаций.