airflow

이슈)DagBag import timeout (airflow DagFile Processing)

DanielZZI 2024. 3. 19. 23:27
728x90

Broken DAG: [/opt/bitnami/airflow/dags/dagname.py] Traceback (most recent call last): File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/urllib3/util/connection.py", line 85, in create_connection sock.connect(sa) File "/opt/bitnami/airflow/venv/lib/ python3.9/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout raise Airflow TaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/bitnami/airflow/dags/dagname.py after 30.0s. Please take a look at these docs to improve your DAG import time: * https://airflow.apache.org/docs/apache-airflow/2.6.3/best-practices.html#top-level-python-code* https://airflow.apache.org/docs/apache-airflow/2.6.3/best-practices.html#reducing-dag-complexity, PID: 293903

 

 

다음과 같이 airflow 에러가 났다.

 

먼저 DagBag import timeout에 대해 알아보기위해 Dag File Processing 에 대해 알아보자.

 

 


-Dag File Processing이란?
참고: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dagfile-processing.html

 

DAG File Processing — Airflow Documentation

 

airflow.apache.org

 

DagFileProcessing은 Dag폴더에 포함된 python파일을 Dag객체로 변환하여 스케줄할 작업을 포함시키는 프로세스를 말한다.

 

DagFileProcessing의 두가지 주요 구성요소:

- DagFileProcessorManager: 처리해야할 파일을 결정하는 무한루프를 실행하는 프로세스

- DagFileProcessorProcess 개별파일을 하나이상의 Dag객체로 변환하기위해 시작되는 별도 프로세스

 

 

DagFileProcessorManager

-새파일을 확인: Dag가 마지막으로 수정된 후 경과시간> dag_dir_list_interval 이면 file paths list 업데이트

-최근에 처리된 파일제외: min_file_process_interval> 파일 최근수정시간 인 파일을 제외 

-파일 경로 대기열에 추가: 발견한 파일을 경로 큐에 추가

-파일 처리: 파일마다 최대 parsing_process까지 새 DagFileProcessProcess시작

-결과수집: 완료된 Dag processor들의 결과수집

-통계기록: 통계 출력하고 dag_processing.total_parse_time을 보냄

 

DagFileProcessorProcess 

-파일처리: dag_file_processor_timeout 안에 전체프로세스를 완료해야함

-Dag파일을 python모듈로 로드: dagbag_import_timeout안에 완료되어야함

-모듈처리: python모듈내에서 Dag객체를 찾음

-DagBag 리턴: Dag 객체 목록을 DagFileProcessorManager에게 리턴

 

 

즉, DagFileProcessManger가 새로 dag로 읽어들일 파일목록을 갱신하고 개별 파일을 DagFileProcessorProcess가 Dag객체로 변환해준다.

 

이 과정에서 dagbag_import_timeout 에러가 난 부분을 보면 결국 dag파일을 python모듈로 로드하는 시간이 너무 길었던것이다.

 

*dag file processing시간확인 방법

/airflow/logs/dag_processor_manager 경로에서 dag_processor_manager.log 확인

 

원인:

dagname.py라는 dag파일에서 import하는 유저가 만든 패키지가 init파일에서 특정 db connection을 시도한다(왜..)

그런데 이 connection이 timeout나면서, module로 로드하는 대기시간이 길어졌음

 

'airflow' 카테고리의 다른 글

custom xcom + taskflow + multi task return 케이스  (0) 2024.04.14
airflow configuration 주요 파라미터  (0) 2024.03.19
DAG ~ seems to be missing from DagBag  (0) 2024.03.13
airflow taskflow  (0) 2024.03.06