Как обрабатывать исключение десериализации и преобразование в новую схему с помощью Spring Cloud Stream?

Мне сложно понять, как правильно обрабатывать исключение десериализации в потоке Spring Cloud. В первую очередь потому, что реализованная структура не поддерживает заголовки, и предполагается, что DLQ представляет собой отдельную схему, чем исходное сообщение. Таким образом, поток процесса должен выглядеть следующим образом: использовать сообщение - ›ошибка десериализации -› DlqHandler - ›сериализовать с НОВОЙ схемой -› отправить в DLQ

Документация, ссылка на которую приведена ниже, не дает хорошего представления о том, возможно ли это вообще. Я видел довольно много примеров SeekToCurrentErrorHandler для Spring-Kafka, но, насколько мне известно, это разные реализации и не соответствуют тому, как я мог бы правильно получить ошибку десериализации, а затем иметь раздел для настраиваемого кода для сериализации в новый формат и перемещения оттуда.

Мой главный вопрос: возможен ли захват исключения десериализации и повторная сериализация с помощью весенних облачных потоков (кафка)?

Документация Spring Cloud для DLQ


person TypicalTypingTom    schedule 10.03.2021    source источник


Ответы (1)


Да, но без использования свойств повторной привязки или DLQ.

Вместо этого добавьте компонент ListenerContainerCustomizer и настройте контейнер слушателя привязки с SeekToCurrentErrorHandler, настроенным для необходимых вам повторных попыток, и, возможно, подклассом DeadLetterPublishingRecoverer, используя правильно настроенный KafkaTemplate и, возможно, переопределив метод createProducerRecord.

person Gary Russell    schedule 10.03.2021