Вот моя установка:
Кластер Kubernetes работает airflow
, который отправляет задание spark
в кластер Kubernetes, задание выполняется нормально, но контейнер должен умереть после выполнения задания, но они все еще там зависают.
- Настройка воздушного потока появляется на кластере K8S.
Dag
запечен в образе докера воздушного потока, потому что я каким-то образом не могу синхронизировать даги изs3
. По какой-то причинеcron
не запускается.- Отправляет задание
spark
в кластер K8S, и задание выполняется нормально. - Но теперь вместо того, чтобы умирать после выполнения и завершения работы, он все еще висит.
Вот моя SparkSubmitOperator
функция
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job_from_airflow',
conn_id='k8s_spark',
java_class='com.dom.rom.mainclass',
application='s3a://some-bucket/jars/demo-jar-with-dependencies.jar',
application_args=['300000'],
total_executor_cores='8',
executor_memory='20g',
num_executors='9',
name='mainclass',
verbose=True,
driver_memory='10g',
conf={
'spark.hadoop.fs.s3a.aws.credentials.provider': 'com.amazonaws.auth.InstanceProfileCredentialsProvider',
'spark.rpc.message.maxSize': '1024',
'spark.hadoop.fs.s3a.impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem',
'spark.kubernetes.container.image': 'dockerhub/spark-image:v0.1',
'spark.kubernetes.namespace' : 'random',
'spark.kubernetes.container.image.pullPolicy': 'IfNotPresent',
'spark.kubernetes.authenticate.driver.serviceAccountName': 'airflow-spark'
},
dag=dag,
)