Смущает параметры Airflow BaseSensorOperator: тайм-аут, poke_interval и режим

Я немного не понимаю, как работают параметры BaseSensorOperator: timeout & poke_interval. Рассмотрим такое использование датчика:

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout after 12 hours
)

В документации упоминается, что тайм-аут устанавливает для задачи «сбой» после ее завершения. Но я использую soft_fail=True, я не думаю, что он сохраняет такое же поведение, потому что я обнаружил, что задача не выполнена, вместо того, чтобы пропустить после того, как я использовал оба параметра soft_fail и timeout.

Так что же здесь происходит?

  1. Сенсор тыкает каждые 4 часа, и при каждом тычке будет ждать длительность таймаута (12 часов)?
  2. Или тыкает каждые 4 часа, итого 3 тыкает, потом таймаут?
  3. Кроме того, что произойдет с этими параметрами, если я использую mode = reschedule?

Вот документация по BaseSensorOperator

class BaseSensorOperator(BaseOperator, SkipMixin):
    """
    Sensor operators are derived from this class and inherit these attributes.
    Sensor operators keep executing at a time interval and succeed when
    a criteria is met and fail if and when they time out.
    :param soft_fail: Set to true to mark the task as SKIPPED on failure
    :type soft_fail: bool
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        inteval should be more than one minute to prevent too much load on
        the scheduler.
    :type mode: str
    """

person Imad    schedule 07.09.2020    source источник


Ответы (2)


Определение терминов

  1. poke_interval: продолжительность ч / б последовательных «тыков» (оценка необходимого условия, которое «ощущается»)

  2. timeout: Просто тыкать на неопределенный срок недопустимо (если, например, ваш код с ошибками высовывается в день, чтобы стать 29, когда месяц равен 2, он будет продолжать тыкать до 4 лет). Таким образом, мы определяем максимальный период, по истечении которого мы прекращаем тыкать и завершаем работу (датчик отмечен либо FAILED, либо SKIPPED)

  3. soft_fail: Обычно (когда soft_fail=False) датчик помечается как FAILED после тайм-аута. Когда soft_fail=True, датчик будет помечен как SKIPPED после тайм-аута.

  4. mode: Это немного сложный

    • Any task (including sensor) when runs, eats up a slot in some pool (either default pool or explicitly specified pool); essentially meaning that it takes up some resources.
    • For sensors, this is
      • wasteful: as a slot is consumed even when we are just waiting (doing no actual work
      • опасно: если в вашем рабочем процессе слишком много датчиков, которые переходят в обнаружение примерно в одно и то же время, они могут заморозить много ресурсов на некоторое время. На самом деле слишком многие, имеющие ExternalTaskSensors, печально известны тем, что помещают целые рабочие процессы (DAG) в взаимоблокировки
    • To overcome this problem, Airflow v1.10.2 introduced modes in sensors
      • mode='poke' (default) means the existing behaviour that we discussed above
      • mode='reschedule' означает, что после попытки проткнуть, а не в спящий режим, датчик будет вести себя так, как если бы он вышел из строя (в текущей попытке), и его статус изменится с RUNNING на UP_FOR_RETRY. Таким образом, он освобождает свой слот, позволяя другим задачам выполняться, пока он ожидает следующей попытки покинга.
    • Ссылка на соответствующий фрагмент здесь
    if self.reschedule:
        reschedule_date = timezone.utcnow() + timedelta(
            seconds=self._get_next_poke_interval(started_at, try_number))
        raise AirflowRescheduleException(reschedule_date)
    else:
        sleep(self._get_next_poke_interval(started_at, try_number))
        try_number += 1
    

А теперь отвечу на ваши вопросы напрямую

Q1

  1. Сенсор тыкает каждые 4 часа, и при каждом тычке будет ждать длительность таймаута (12 часов)?
  2. Или тыкает каждые 4 часа, итого 3 тыкает, потом таймаут?

пункт 2. правильный

2-й квартал

Кроме того, что произойдет с этими параметрами, если я использую mode = reschedule?

Как объяснялось ранее, каждый из этих параметров независим, и установка mode='reschedule' никоим образом не меняет их поведение.

person y2k-shubham    schedule 07.09.2020
comment
Спасибо за подробное объяснение. То, как работает параметр mode=reschedule, действительно похоже на логическое поведение по умолчанию, которое мы хотим, чтобы любой датчик имел нормально ... например. Если даг запускает 30 субдагов, каждый из которых отвечает за ETL файла данных, то 30 датчиков, выискивающих и ожидающих, кажутся действительно плохой идеей. - person Imad; 07.09.2020
comment
Я бы отдал это вам, @Aetos. Я думаю, что в первые дни они, возможно, не задумывались, что сенсоры используются так часто; и с годами пользователи и варианты использования Airflow значительно продвинулись вперед. Несмотря на это; mode='poke' по-прежнему представляет собой сильный случай (и фактически сводит к минимуму повторяющиеся накладные расходы на планирование), когда poke_interval мало (скажем, ~ 1 мин); Ребята из Astronomer.io говорят это "..'poke' mode: Use this mode if the expected runtime of the sensor is short or if a short poke interval is required.." - person y2k-shubham; 07.09.2020

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout of 12 hours
  mode = "reschedule"
)

Допустим, критерии не соблюдены с первого взгляда. Таким образом, он снова запустится через 4 часа перерыва. Но рабочий слот будет освобожден во время ожидания, поскольку мы используем mode="reschedule".

Вот что я понял.

person Sanajaoba Thongram    schedule 12.10.2020