Почему запись не может быть удалена с помощью JDBC Sink Connector в Kafka Connect

Свойства моей раковины:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

my connect-avro-distributed.properties :

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Я отправляю данные следующим образом:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

Я могу вставлять или обновлять данные в оракуле следующим образом

{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}

но когда я помещаю это, чтобы удалить запись в оракуле, он не может удалить данные, просто обновляет столбцы как нулевые

{"id":2}${"id": null, "product": null , "quantity": null , "price": null }

как я могу это решить?

заранее спасибо


person Ersin Gülbahar    schedule 13.04.2020    source источник


Ответы (1)


На самом деле вам необходимо создать запись надгробной плиты. В Kafka удаление с помощью JDBC Sink Connector работает следующим образом:

Соединитель может удалять строки в таблице базы данных, когда он использует запись-захоронение, которая является записью Kafka с ненулевым ключом и нулевым значением. Это поведение отключено по умолчанию, что означает, что любые записи захоронения приведут к сбою соединителя, что упрощает обновление соединителя JDBC и сохранение предыдущего поведения.

Удаление можно включить с помощью delete.enabled=true, но только если для pk.mode установлено значение record_key. Это связано с тем, что для удаления строки из таблицы в качестве критерия необходимо использовать первичный ключ.

Включение режима удаления не влияет на файл insert.mode.

Также обратите внимание, что запись такого типа будет удалена только через delete.retention.ms миллисекунд, что в настоящее время по умолчанию составляет 24 часа.


Поэтому попробуйте уменьшить эту конфигурацию в своих свойствах и посмотреть, работает ли она. Для этого вам нужно выполнить следующую команду:

 ./bin/kafka-topics.sh \
    --alter \
    --zookeeper localhost:2181 \
    --topic orders \
    --config retention.ms=100 

Теперь, когда конфигурация завершена, все, что вам нужно сделать, это сгенерировать сообщение с ненулевым ключом и нулевым значением. Обратите внимание, что потребитель консоли Kafka нельзя использовать для создания нулевой записи, поскольку пользовательский ввод анализируется как UTF-8. Следовательно,

{"id":2}${"id": null, "product": null , "quantity": null , "price": null }

не является фактическим сообщением о надгробии.

Однако вы можете использовать kafkacat, но он работает только для сообщений JSON:

# Produce a tombstone (a "delete" for compacted topics) for key 
# "abc" by providing an empty message value which -Z interpretes as NULL:

echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:

но в вашем случае это не сработает, так как вам нужно отправлять сообщения Avro. Поэтому я бы предложил написать очень простой Avro Producer на вашем любимом языке, чтобы вы действительно могли отправить сообщение о надгробии.

person Giorgos Myrianthous    schedule 13.04.2020
comment
Комментарии не для расширенного обсуждения; этот разговор был перемещен в чат. - person Samuel Liew♦; 14.04.2020