Kafka стало проще с Flask
Что такое Кафка?
Apache Kafka - это отказоустойчивая платформа потоковой передачи событий. При потоковой передаче событий данные собираются в реальном времени и из разных источников, это могут быть данные веб-аналитики, данные термостата или даже база данных. Наряду со сбором данных существует множество ресурсов, предоставляемых вместе с Kafka для управления данными и их обработки, а также для эффективного разделения ресурсов, приоритизируя различные критически важные процессы и умеренно эффективные процессы. Короче говоря, это Кафка и потоковая передача событий. Чтобы узнать больше о Kafka, посмотрите это видео.
Какую пользу / помощь принесет Kafka при помощи выводов потоковой модели?
Большинство моделей глубокого обучения развертываются через Flask через вызовы REST API. Позже, чтобы развернуть его с помощью сервера; разработчики используют серверы типа gunicorn
и uvicorn
с разным количеством рабочих и потоков. Это нормально, пока вы не делаете только вывод, когда у вас есть только одна модель, и не требуется много времени для вывода. Но если есть выходы из комбинаций разных моделей или даже если после логического вывода есть несколько шагов, лучше иметь конвейер потоковой обработки.
Например, пользователь хочет обобщить всю книгу, пожелания и электронное письмо, а также уведомление на платформе после завершения процесса, если вы будете придерживаться подхода REST API, сначала вам нужно будет обработать запрос на вывод, а затем отправьте электронное письмо, а затем передайте его через контроллер уведомлений, чтобы отправить уведомление. Если вы достаточно умен, вы бы сказали, что я могу использовать что-то вроде ray
для параллельного запуска электронной почты и части уведомлений. Да, можно, но уверены ли вы, что ray
отказоустойчив? Так что в подобных задачах можно использовать силу Кафки.
Как показано на схеме ниже, у вас может быть вызов REST API для передачи данных для вывода; тогда флеш-сервер поместит данные в очередь / тему, а потребитель логического вывода будет продолжать получать данные из этой темы. Потребитель сделает вывод, а затем отправит необходимые данные поставщику электронной почты и поставщику уведомлений, и эти производители снова поместят эту информацию и данные в две разные темы: тему электронной почты и тему уведомления. Теперь потребитель электронной почты и получатель уведомлений будут брать информацию из своей соответствующей темы / очереди, выполнять необходимую обработку и отправлять электронное письмо и уведомление соответственно. Чтобы сохранить информацию в неизменном виде и избежать потери информации, вы можете использовать разные факторы репликации, а чтобы сделать конвейер более гибким и надежным, вы можете иметь группы потребителей и несколько тематических разделов.
В этом блоге мы не будем рассматривать группы потребителей и тематические разделы, а просто рассмотрим простой подход Kafka Consumer and Producer в Python через flask.
Чтобы скачать и настроить Kafka, следуя инструкциям на этой странице
Установите необходимые библиотеки
pip install kafka-python flask flask_cors
Создание темы Kafka
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>
- Создание темы вывода
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic INFERENCE
- Создание темы электронного письма
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic EMAIL
- Создание темы уведомления
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic NOTIFICATION
Создание сервера Flask
- Создайте файл
app.py
и добавьте следующие
# app.py from flask import Flask, request, jsonify import json from flask_cors import CORS from kafka import KafkaConsumer, KafkaProducer
app = Flask(__name__)TOPIC_NAME = "INFERENCE" KAFKA_SERVER = "localhost:9092" producer = KafkaProducer( bootstrap_servers = KAFKA_SERVER, api_version = (0, 11, 15) ) @app.route('/kafka/pushToConsumers', methods=['POST']) def kafkaProducer(): req = request.get_json() json_payload = json.dumps(req) json_payload = str.encode(json_payload) # push data into INFERENCE TOPIC producer.send(TOPIC_NAME, json_payload) producer.flush() print("Sent to consumer") return jsonify({ "message": "You will receive an email in a short while with the plot", "status": "Pass"}) if __name__ == "__main__": app.run(debug=True, port = 5000)
Обратите внимание, что Kafka хранит данные только в байтовой форме. Итак, какие бы данные у вас ни были: изображение, текст, звук и т. Д., Они должны быть сначала преобразованы в байты, а затем переданы производителю.
Потребитель вывода Kafka
- Создайте файл
inference_consumer.py
и добавьте следующий
# inference_consumer.py
from kafka import KafkaConsumer, KafkaProducer
import os
import json
import uuid
from concurrent.futures import ThreadPoolExecutor
TOPIC_NAME = "INFERENCE"
KAFKA_SERVER = "localhost:9092"
NOTIFICATION_TOPIC = "NOTIFICATION"
EMAIL_TOPIC = "EMAIL"
consumer = KafkaConsumer(
TOPIC_NAME,
# to deserialize kafka.producer.object into dict
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
producer = KafkaProducer(
bootstrap_servers = KAFKA_SERVER,
api_version = (0, 11, 15)
)
def inferencProcessFunction(data):
. . . . .
. . . . .
. . . . .
# process steps
. . . . .
. . . . .
notification_data = {...}
email_data = {...}
producer.send(NOTIFICATION_TOPIC, notification_data)
producer.flush()
producer.send(EMAIL_TOPIC, email_data)
producer.flush()
for inf in consumer:
inf_data = inf.value
inferencProcessFunction(inf_data)
Потребитель электронной почты Kafka
- Создайте файл
email_consumer.py
и добавьте следующий
# email_consumer.py
from kafka import KafkaConsumer, KafkaProducer import os import json import uuid from concurrent.futures import ThreadPoolExecutor
TOPIC_NAME = "EMAIL"
consumer = KafkaConsumer( TOPIC_NAME, # to deserialize kafka.producer.object into dict value_deserializer=lambda m: json.loads(m.decode('utf-8')), )
def sendEmail(data): . . . . . . . . . . . . . . . # process steps . . . . . . . . . .
for email in consumer: email_data = email.value sendEmail(email_data)
Потребитель уведомлений Kafka
- Создайте файл
notification_consumer.py
и добавьте следующий
# notification_consumer.py
from kafka import KafkaConsumer, KafkaProducer import os import json import uuid from concurrent.futures import ThreadPoolExecutor
TOPIC_NAME = "NOTIFICATION"
consumer = KafkaConsumer( TOPIC_NAME, # to deserialize kafka.producer.object into dict value_deserializer=lambda m: json.loads(m.decode('utf-8')), )
def sendNotification(data): . . . . . . . . . . . . . . . # process steps . . . . . . . . . .
for notification in consumer: notification_data = email.value sendNotification(notification_data)
Как параллельно запускать потребительские процессы без групп потребителей?
Чтобы запустить потребительские процессы параллельно без групп потребителей, вы можете обернуть цикл потребителя for с помощью ThreadPoolExecutor, как показано ниже.
with ThreadPoolExecutor(4) as tpool:
for email in consumer:
email_data = email.value
future = tpool.submit(sendEmail, email_data)
Как запустить этот конвейер?
Здесь у нас будет три разных потребителя и один сервер для обслуживания вызовов REST. Таким образом, вам нужно будет открыть четыре окна терминала и выполнить следующие команды для запуска каждого сценария.
- Запуск сервера с
flask
# folder where app.py file is python app.py
- Или s запуск сервера с
gunicorn
# install gunicorn pip install gunicorn # folder where app.py file is gunicorn -k gthread -w 2 -t 40000 --threads 3 -b:5000 app:app
- Запуск потребителя вывода (новое окно терминала)
# folder where inference_consumer.py file is python inference_consumer.py
- Начальный потребитель электронной почты (новое окно терминала)
# folder where email_consumer.py file is python email_consumer.py
- Запуск потребителя уведомлений (новое окно терминала)
# folder where notification_consumer.py file is python notification_consumer.py
Теперь вы можете попробовать отправить запрос на свой сервер, и вы получите ответ со статусом, и ваш вывод будет поставлен в очередь, и вы вскоре получите электронное письмо и уведомление в зависимости от шагов вашего процесса.
В итоге
В этом блоге мы увидели преимущества использования системы потоковой передачи событий вместо традиционных вызовов API и ожидания ответа. Мы также рассмотрели и рассмотрели структуру практических примеров того, как можно использовать Kafka с их моделями вывода глубокого обучения для потокового вывода и других этапов постобработки и сделать процесс гибким, надежным и простым, разделив работу между разными потребителями. Надеюсь, вам понравилось это читать, и вы сможете использовать эту структуру для некоторых своих реализаций.