Динамическое сопоставление задач (DTM) — это важная функция, которая обеспечивает большую гибкость при создании групп обеспечения доступности баз данных. Однако он не обеспечивает бесконечной гибкости и не освобождает вас от привязанности к шаблонам Airflow. Например, ваши сопоставления задач ограничены типами данных, поддерживаемыми XCom, а именно Python dict и lists. Например, я не нашел способа динамически установить секрет Kubernetes для KubernetesPodOperator.

Python позволяет вам мариновать такой объект, как секрет Kubernetes, который преобразует такой объект, как этот, в поток байтов, и XCom действительно поддерживает консервированные данные, но я не нашел способа использовать это в сочетании с DTM.

Вот несколько полезных советов и наблюдений, собранных во время нашей работы с DTM в Редактикс:

Работа с глобальными переменными

Учтите, что значения, установленные внутри функций сопоставления задач, устанавливаются во время выполнения после того, как значения присваиваются переменным вне этих сопоставленных функций. Поэтому вы не можете, например, присвоить значение глобальной переменной внутри сопоставления задачи и ожидать, что это значение будет доступно вне этих сопоставленных функций:

secrets = []

@task()
def set_vars(**context):
    global secrets
    secrets.append(Secret('volume', "/secretpath/" + context["params"]["secretname"], context["params"]["secretname"]))
    return secrets

@task()
def init_wf(secrets, **context):
    print(secrets)
    return "hello"

init_workflow = init_wf.partial().expand(
    secrets=set_vars()
)

start_job = KubernetesPodOperator(
    task_id="start-job",
    image="postgres:12"
    cmds=["uname"],
    secrets=secrets ### this value is going to be null
    )
init_workflow >> start_job

Значение secrets в KubernetesPodOperator будет нулевым, поскольку к моменту инициализации start_job set_vars еще не запущено.

Вы можете дублировать свои DAG, чтобы они работали в другом контексте, например. другое расписание

Не самый красивый шаблон, но вы можете продублировать свои DAG, например, с помощью скрипта точки входа Docker:

#!/bin/bash

arr=( "workflow1" "workflow2" )

for workflow_id in "${arr[@]}"
do
    cp /tmp/dag-template.py /opt/airflow/dags/${workflow_id}-mydag.py
done

Затем ваши группы обеспечения доступности баз данных могут получить конфигурацию из API, переменных среды, файлов или чего-то еще, что наиболее целесообразно, чтобы передать эти варианты в вашу группу обеспечения доступности баз данных. Это может показаться очевидным и уж точно некрасивым, но нам пришлось смириться с этим, потому что нам нужен был доступ к определенным параметрам (например, графику), а это не входило в рамки DTM.

Если вы решите получать некоторые значения через API, это позволит еще большей части DAG быть динамичной, чтобы ее не нужно было обновлять всякий раз, когда вы хотите изменить значения. Мы решили использовать это workflow_id в имени файла для передачи в API:

dag_file = os.path.basename(__file__).split('.')[0]
dag_id = dag_file.replace('-mydag','')

API_KEY = os.environ['API_KEY']
API_HOST = "https://api.yourthing.com"
headers = {'Content-type': 'application/json', 'Accept': 'text/plain', 'x-api-key': API_KEY}
apiUrl = API_HOST + '/airflowconfigs/' + dag_id
request = requests.get(apiUrl, headers=headers)
wf_config = request.json()

Выполнение этого в верхней части DAG гарантирует, что wf_config будет доступна в качестве глобальной переменной во всей вашей DAG. Вы можете контролировать, как часто будет опрашиваться ваш API, и если вас беспокоит, как это масштабируется, кэшируйте эти конфигурации с помощью Redis.

Для доступа к объекту контекста, включая параметры DagRun, требуется API TaskFlow.

Например, если вы используете Airflow REST API и передаете объект конфигурации в конечную точку DAGRun, вы не можете получить доступ к этим аргументам из оператора стиля classic, такого как PythonOperator. Вместо этого вы должны использовать API TaskFlow, предназначенный для использования с DTM. Например:

@task()
def start_job(**context):
    print(context["params"]["myparam"])