Почему искры исполнителя не умирают

Вот моя установка:

Кластер Kubernetes работает airflow, который отправляет задание spark в кластер Kubernetes, задание выполняется нормально, но контейнер должен умереть после выполнения задания, но они все еще там зависают.

  1. Настройка воздушного потока появляется на кластере K8S.
  2. Dag запечен в образе докера воздушного потока, потому что я каким-то образом не могу синхронизировать даги из s3. По какой-то причине cron не запускается.
  3. Отправляет задание spark в кластер K8S, и задание выполняется нормально.
  4. Но теперь вместо того, чтобы умирать после выполнения и завершения работы, он все еще висит.

Вот моя SparkSubmitOperator функция

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job_from_airflow',
    conn_id='k8s_spark',
    java_class='com.dom.rom.mainclass',
    application='s3a://some-bucket/jars/demo-jar-with-dependencies.jar',
    application_args=['300000'],
    total_executor_cores='8',
    executor_memory='20g',
    num_executors='9',
    name='mainclass',
    verbose=True,
    driver_memory='10g',
    conf={
        'spark.hadoop.fs.s3a.aws.credentials.provider': 'com.amazonaws.auth.InstanceProfileCredentialsProvider',
        'spark.rpc.message.maxSize': '1024',
        'spark.hadoop.fs.s3a.impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem',
        'spark.kubernetes.container.image': 'dockerhub/spark-image:v0.1',
        'spark.kubernetes.namespace' : 'random',
        'spark.kubernetes.container.image.pullPolicy': 'IfNotPresent',
        'spark.kubernetes.authenticate.driver.serviceAccountName': 'airflow-spark'
        },
    dag=dag,
)

person devnull    schedule 17.09.2019    source источник
comment
Я не уверен, но, возможно, у вас такая же проблема, как описано здесь stackoverflow.com/questions/57964848/   -  person Yudovin Artsiom    schedule 18.09.2019
comment
На самом деле это моя проблема   -  person devnull    schedule 24.09.2019
comment
В этом случае этот ответ должен вам помочь.   -  person Yudovin Artsiom    schedule 24.09.2019


Ответы (1)


Я решил, что проблема в том, что это моя ошибка, я не закрывал spark session, добавил следующее

session.stop();
person devnull    schedule 24.09.2019