Недавно была выпущена новая версия Ceph. Эта версия содержит несколько новых функций, расширяющих Ceph как подходящее решение SDS для больших данных и масштабирования данных машинного обучения/ИИ. Одна из новых функций, Bucket Notifications, обеспечивает связь Amazon SNS-SQS между целями RGW и MQ и дает возможность получать уведомления каждый раз, когда объект создается/удаляется. Таким образом, мы могли бы определить тему, в которую RGW будет отправлять уведомления (также известная как тема SNS), и очередь, ожидающую поступления уведомлений (также известная как SQS). Поддерживаемые целевые конечные точки — RabbitMQ (AMQP), Kafka и HTTP. В этой демонстрации мы будем говорить о конечной точке Kafka, другие целевые конечные точки выходят за рамки следующей статьи.
Предпосылки
Чтобы запустить эту демонстрацию, у нас должны быть работающие кластеры RHCS4.0 и kafka. Для облегчения обоих развертываний вы можете использовать репозитории kafka-docker и Ceph nano git.
Начиная
В следующей демонстрации мы будем запускать докеризованный кластер kafka, который будет получать все уведомления о создании и удалении объектов. чтобы запустить свой кластер kafka, клонируйте следующий репозиторий:
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker
Установка
У нас есть возможность предварительно создать тему kafka, которая будет прослушивать наши уведомления ведра. для этого отредактируйте docker-compose-single-broker.yaml и измените значение KAFKA_CREATE_TOPICS, «тест» — это место, где вы меняете название темы. Измените название темы на «хранилище» только для демонстрации, затем запустите:
docker-compose -f docker-compose-single-broker.yml up -d
docker-compose ps Name Command State Ports ---------------------------------------------------------------------------------------------------------------------- kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:9092->9092/tcp kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
netstat -ntlp | egrep -e "9092|2181" tcp6 0 0 :::9092 :::* LISTEN 13911/docker-proxy tcp6 0 0 :::2181 :::* LISTEN 13931/docker-proxy
Давайте проверим, что RGW прослушивает, и мы можем получить к нему доступ по правильному адресу и порту (выход должен содержать следующий XML):
curl <RGW_ADDRESS>:<RGW_PORT>
<?xml version="1.0" encoding="UTF-8"?><ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><ID>anonymous</ID><DisplayName></DisplayName></Owner><Buckets></Buckets></ListAllMyBucketsResult>
Нам нужно создать пользователя, который будет иметь права на загрузку/удаление объектов и настройку уведомлений корзины:
radosgw-admin user create --uid=test-notifications display-name=test-notifications --access-key=test-notifications --secret=test-notifications
Запуск тестов
Теперь, когда кластер kafka запущен и работает, давайте проверим, что тема создана и начала прослушивать любые новые уведомления:
docker exec -it kafka-docker_kafka_1 bash -c "/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092" storage
docker exec -it kafka-docker_kafka_1 bash -c "/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic storage --from-beginning"
Если вы не видите никаких сообщений, это нормально, мы еще не отправляли никаких уведомлений. Чтобы настроить уведомления корзины, мы могли бы использовать инструмент под названием notify, который обрабатывает всю необходимую конфигурацию REST для нашего RGW и настраивает тему и уведомление корзины. Для этого:
Извлеките подходящий образ докера:
docker pull shonpaz123/notify
Справка по этому инструменту предоставляется с помощью следующей команды:
docker run shonpaz123/notify -h usage: notify.py [-h] -e ENDPOINT_URL -a ACCESS_KEY -s SECRET_KEY -b BUCKET_NAME [-k KAFKA_ENDPOINT] [-r RABBITMQ_ENDPOINT] -t TOPIC
optional arguments: -h, --help show this help message and exit -e ENDPOINT_URL, --endpoint-url ENDPOINT_URL endpoint url for s3 object storage -a ACCESS_KEY, --access-key ACCESS_KEY access key for s3 object storage -s SECRET_KEY, --secret-key SECRET_KEY secret key for s3 object storage -b BUCKET_NAME, --bucket-name BUCKET_NAME s3 bucket name -k KAFKA_ENDPOINT, --kafka-endpoint KAFKA_ENDPOINT kafka endpoint in which rgw will send notifications to -r RABBITMQ_ENDPOINT, --rabbitmq-endpoint RABBITMQ_ENDPOINT rabbitmq topic in which rgw will send notifications to -t TOPIC, --topic TOPIC kafka topic in which rgw will send notifications to
Например, настройка уведомлений корзины для корзины «test-notifications» будет обрабатываться следующим образом:
docker run shonpaz123/notify -a test-notifications -s test-notifications -b test-notifications -k <KAFKA_ADDRESS>:9092 -t storage -e <RGW_ADDRESS>:<RGW_PORT>
После настройки следующего давайте настроим наши учетные данные S3 и создадим корзину, чтобы мы могли начать загрузку объектов:
export AWS_SECRET_ACCESS_KEY=test-notifications
export AWS_ACCESS_KEY_ID=test-notifications
aws s3 mb s3://test-notifications --endpoint-url http://<RGW_ADDRESS>:<RGW_PORT>
Теперь давайте создадим случайный объект и загрузим его в нашу службу S3, позже мы удалим его, чтобы мы могли проверить оба типа уведомлений:
truncate -s 10M test-notifications
aws s3 cp test-notifications s3://test-notifications --endpoint-url http://<RGW_ADDRESS>:<RGW_PORT>
aws s3 rm s3://test-notifications/test-notifications --endpoint-url http://<RGW_ADDRESS>:<RGW_PORT>
Теперь, если мы перейдем к нашему кластеру kafka, где мы начали использовать уведомления нашей темы, мы должны увидеть два уведомления, одно для создания и одно для удаления:
{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"","eventTime":"2020-02-24 11:23:51.744652Z","eventName":"s3:ObjectCreated:CompleteMultipartUpload","userIdentity":{"principalId":"test-notifications"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4177.10","x-amz-id-2":"1051-default-default"},"s3":{"s3SchemaVersion":"1.0","configurationId":"test-configuration","bucket":{"name":"test-notifications","ownerIdentity":{"principalId":"test-notifications"},"arn":"arn:aws:s3:::test-notifications","id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4181.1"},"object":{"key":"test-notifications","size":0,"etag":"669fdad9e309b552f1e9cf7b489c1f73-2","versionId":"","sequencer":"47B2535E2DE4622C","metadata":[{"key":"x-amz-content-sha256","val":"6fc9f44742bb1f04c293d42949652effd5f52a4230d45b1a0f2dcee53cee81e7"},{"key":"x-amz-date","val":"20200224T112351Z"}],"tags":[]}},"eventId":""}
{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"","eventTime":"2020-02-24 11:24:57.746365Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"test-notifications"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4177.11","x-amz-id-2":"1051-default-default"},"s3":{"s3SchemaVersion":"1.0","configurationId":"test-configuration","bucket":{"name":"test-notifications","ownerIdentity":{"principalId":"test-notifications"},"arn":"arn:aws:s3:::test-notifications","id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4181.1"},"object":{"key":"test-notifications","size":0,"etag":"669fdad9e309b552f1e9cf7b489c1f73-2","versionId":"","sequencer":"89B2535EE6037D2C","metadata":[{"key":"x-amz-content-sha256","val":"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"},{"key":"x-amz-date","val":"20200224T112457Z"}],"tags":[]}},"eventId":""}
Мы видим два уведомления, которые достигли нашей темы kafka, а в разделе «eventName» мы можем видеть поля s3: ObjectCreated и s3: ObjectRemoved.
Вывод
В этой демонстрации мы увидели, как Ceph становится неотъемлемой частью мира больших данных не только благодаря своей огромной емкости, но и благодаря предоставлению подходящих функций для более легкого управления нашими конвейерами данных.