Broken DAG: [/opt/bitnami/airflow/dags/dagname.py] Traceback (most recent call last): File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/urllib3/util/connection.py", line 85, in create_connection sock.connect(sa) File "/opt/bitnami/airflow/venv/lib/ python3.9/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout raise Airflow TaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/bitnami/airflow/dags/dagname.py after 30.0s. Please take a look at these docs to improve your DAG import time: * https://airflow.apache.org/docs/apache-airflow/2.6.3/best-practices.html#top-level-python-code* https://airflow.apache.org/docs/apache-airflow/2.6.3/best-practices.html#reducing-dag-complexity, PID: 293903
다음과 같이 airflow 에러가 났다.
먼저 DagBag import timeout에 대해 알아보기위해 Dag File Processing 에 대해 알아보자.
-Dag File Processing이란?
참고: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dagfile-processing.html
DAG File Processing — Airflow Documentation
airflow.apache.org
DagFileProcessing은 Dag폴더에 포함된 python파일을 Dag객체로 변환하여 스케줄할 작업을 포함시키는 프로세스를 말한다.
DagFileProcessing의 두가지 주요 구성요소:
- DagFileProcessorManager: 처리해야할 파일을 결정하는 무한루프를 실행하는 프로세스
- DagFileProcessorProcess 개별파일을 하나이상의 Dag객체로 변환하기위해 시작되는 별도 프로세스
DagFileProcessorManager
-새파일을 확인: Dag가 마지막으로 수정된 후 경과시간> dag_dir_list_interval 이면 file paths list 업데이트
-최근에 처리된 파일제외: min_file_process_interval> 파일 최근수정시간 인 파일을 제외
-파일 경로 대기열에 추가: 발견한 파일을 경로 큐에 추가
-파일 처리: 파일마다 최대 parsing_process까지 새 DagFileProcessProcess시작
-결과수집: 완료된 Dag processor들의 결과수집
-통계기록: 통계 출력하고 dag_processing.total_parse_time을 보냄
DagFileProcessorProcess
-파일처리: dag_file_processor_timeout 안에 전체프로세스를 완료해야함
-Dag파일을 python모듈로 로드: dagbag_import_timeout안에 완료되어야함
-모듈처리: python모듈내에서 Dag객체를 찾음
-DagBag 리턴: Dag 객체 목록을 DagFileProcessorManager에게 리턴
즉, DagFileProcessManger가 새로 dag로 읽어들일 파일목록을 갱신하고 개별 파일을 DagFileProcessorProcess가 Dag객체로 변환해준다.
이 과정에서 dagbag_import_timeout 에러가 난 부분을 보면 결국 dag파일을 python모듈로 로드하는 시간이 너무 길었던것이다.
*dag file processing시간확인 방법
/airflow/logs/dag_processor_manager 경로에서 dag_processor_manager.log 확인
원인:
dagname.py라는 dag파일에서 import하는 유저가 만든 패키지가 init파일에서 특정 db connection을 시도한다(왜..)
그런데 이 connection이 timeout나면서, module로 로드하는 대기시간이 길어졌음
'airflow' 카테고리의 다른 글
custom xcom + taskflow + multi task return 케이스 (0) | 2024.04.14 |
---|---|
airflow configuration 주요 파라미터 (0) | 2024.03.19 |
DAG ~ seems to be missing from DagBag (0) | 2024.03.13 |
airflow taskflow (0) | 2024.03.06 |