Недавно была выпущена новая версия Ceph. Эта версия содержит несколько новых функций, расширяющих Ceph как подходящее решение SDS для больших данных и масштабирования данных машинного обучения/ИИ. Одна из новых функций, Bucket Notifications, обеспечивает связь Amazon SNS-SQS между целями RGW и MQ и дает возможность получать уведомления каждый раз, когда объект создается/удаляется. Таким образом, мы могли бы определить тему, в которую RGW будет отправлять уведомления (также известная как тема SNS), и очередь, ожидающую поступления уведомлений (также известная как SQS). Поддерживаемые целевые конечные точки — RabbitMQ (AMQP), Kafka и HTTP. В этой демонстрации мы будем говорить о конечной точке Kafka, другие целевые конечные точки выходят за рамки следующей статьи.

Предпосылки

Чтобы запустить эту демонстрацию, у нас должны быть работающие кластеры RHCS4.0 и kafka. Для облегчения обоих развертываний вы можете использовать репозитории kafka-docker и Ceph nano git.

Начиная

В следующей демонстрации мы будем запускать докеризованный кластер kafka, который будет получать все уведомления о создании и удалении объектов. чтобы запустить свой кластер kafka, клонируйте следующий репозиторий:

git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker

Установка

У нас есть возможность предварительно создать тему kafka, которая будет прослушивать наши уведомления ведра. для этого отредактируйте docker-compose-single-broker.yaml и измените значение KAFKA_CREATE_TOPICS, «тест» — это место, где вы меняете название темы. Измените название темы на «хранилище» только для демонстрации, затем запустите:

docker-compose -f docker-compose-single-broker.yml up -d
docker-compose ps 
          Name                        Command               State                         Ports                       
----------------------------------------------------------------------------------------------------------------------
kafka-docker_kafka_1       start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp                            
kafka-docker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
netstat -ntlp | egrep -e "9092|2181"
tcp6       0      0 :::9092                 :::*                    LISTEN      13911/docker-proxy  
tcp6       0      0 :::2181                 :::*                    LISTEN      13931/docker-proxy

Давайте проверим, что RGW прослушивает, и мы можем получить к нему доступ по правильному адресу и порту (выход должен содержать следующий XML):

curl <RGW_ADDRESS>:<RGW_PORT>
<?xml version="1.0" encoding="UTF-8"?><ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><ID>anonymous</ID><DisplayName></DisplayName></Owner><Buckets></Buckets></ListAllMyBucketsResult>

Нам нужно создать пользователя, который будет иметь права на загрузку/удаление объектов и настройку уведомлений корзины:

radosgw-admin user create --uid=test-notifications display-name=test-notifications --access-key=test-notifications --secret=test-notifications

Запуск тестов

Теперь, когда кластер kafka запущен и работает, давайте проверим, что тема создана и начала прослушивать любые новые уведомления:

docker exec -it kafka-docker_kafka_1 bash -c "/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092"
storage
docker exec -it kafka-docker_kafka_1 bash -c "/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic storage --from-beginning"

Если вы не видите никаких сообщений, это нормально, мы еще не отправляли никаких уведомлений. Чтобы настроить уведомления корзины, мы могли бы использовать инструмент под названием notify, который обрабатывает всю необходимую конфигурацию REST для нашего RGW и настраивает тему и уведомление корзины. Для этого:

Извлеките подходящий образ докера:

docker pull shonpaz123/notify

Справка по этому инструменту предоставляется с помощью следующей команды:

docker run shonpaz123/notify -h
usage: notify.py [-h] -e ENDPOINT_URL -a ACCESS_KEY -s SECRET_KEY -b
                 BUCKET_NAME [-k KAFKA_ENDPOINT] [-r RABBITMQ_ENDPOINT] -t
                 TOPIC
optional arguments:
  -h, --help            show this help message and exit
  -e ENDPOINT_URL, --endpoint-url ENDPOINT_URL
                        endpoint url for s3 object storage
  -a ACCESS_KEY, --access-key ACCESS_KEY
                        access key for s3 object storage
  -s SECRET_KEY, --secret-key SECRET_KEY
                        secret key for s3 object storage
  -b BUCKET_NAME, --bucket-name BUCKET_NAME
                        s3 bucket name
  -k KAFKA_ENDPOINT, --kafka-endpoint KAFKA_ENDPOINT
                        kafka endpoint in which rgw will send notifications to
  -r RABBITMQ_ENDPOINT, --rabbitmq-endpoint RABBITMQ_ENDPOINT
                        rabbitmq topic in which rgw will send notifications to
  -t TOPIC, --topic TOPIC
                        kafka topic in which rgw will send notifications to

Например, настройка уведомлений корзины для корзины «test-notifications» будет обрабатываться следующим образом:

docker run shonpaz123/notify -a test-notifications -s test-notifications -b test-notifications -k <KAFKA_ADDRESS>:9092 -t storage -e <RGW_ADDRESS>:<RGW_PORT>

После настройки следующего давайте настроим наши учетные данные S3 и создадим корзину, чтобы мы могли начать загрузку объектов:

export AWS_SECRET_ACCESS_KEY=test-notifications
export AWS_ACCESS_KEY_ID=test-notifications
aws s3 mb s3://test-notifications --endpoint-url http://<RGW_ADDRESS>:<RGW_PORT>

Теперь давайте создадим случайный объект и загрузим его в нашу службу S3, позже мы удалим его, чтобы мы могли проверить оба типа уведомлений:

truncate -s 10M test-notifications
aws s3 cp test-notifications s3://test-notifications --endpoint-url http://<RGW_ADDRESS>:<RGW_PORT>
aws s3 rm s3://test-notifications/test-notifications --endpoint-url http://<RGW_ADDRESS>:<RGW_PORT>

Теперь, если мы перейдем к нашему кластеру kafka, где мы начали использовать уведомления нашей темы, мы должны увидеть два уведомления, одно для создания и одно для удаления:

{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"","eventTime":"2020-02-24 11:23:51.744652Z","eventName":"s3:ObjectCreated:CompleteMultipartUpload","userIdentity":{"principalId":"test-notifications"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4177.10","x-amz-id-2":"1051-default-default"},"s3":{"s3SchemaVersion":"1.0","configurationId":"test-configuration","bucket":{"name":"test-notifications","ownerIdentity":{"principalId":"test-notifications"},"arn":"arn:aws:s3:::test-notifications","id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4181.1"},"object":{"key":"test-notifications","size":0,"etag":"669fdad9e309b552f1e9cf7b489c1f73-2","versionId":"","sequencer":"47B2535E2DE4622C","metadata":[{"key":"x-amz-content-sha256","val":"6fc9f44742bb1f04c293d42949652effd5f52a4230d45b1a0f2dcee53cee81e7"},{"key":"x-amz-date","val":"20200224T112351Z"}],"tags":[]}},"eventId":""}
{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"","eventTime":"2020-02-24 11:24:57.746365Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"test-notifications"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4177.11","x-amz-id-2":"1051-default-default"},"s3":{"s3SchemaVersion":"1.0","configurationId":"test-configuration","bucket":{"name":"test-notifications","ownerIdentity":{"principalId":"test-notifications"},"arn":"arn:aws:s3:::test-notifications","id":"6d95e0d4-5f6e-467f-82a2-ad8b8616157c.4181.1"},"object":{"key":"test-notifications","size":0,"etag":"669fdad9e309b552f1e9cf7b489c1f73-2","versionId":"","sequencer":"89B2535EE6037D2C","metadata":[{"key":"x-amz-content-sha256","val":"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"},{"key":"x-amz-date","val":"20200224T112457Z"}],"tags":[]}},"eventId":""}

Мы видим два уведомления, которые достигли нашей темы kafka, а в разделе «eventName» мы можем видеть поля s3: ObjectCreated и s3: ObjectRemoved.

Вывод

В этой демонстрации мы увидели, как Ceph становится неотъемлемой частью мира больших данных не только благодаря своей огромной емкости, но и благодаря предоставлению подходящих функций для более легкого управления нашими конвейерами данных.