airflow 5

custom xcom + taskflow + multi task return 케이스

우리는 s3(minIO)를 활용한 custom xcom을 구성하여 airflow를 사용중이다. 빅데이터를 처리하기위해서 task간 xcom을 통한 데이터 이동시, storage object에 pickle형태로 보관하도록 했다. 우리 airflow를 사용하는 유저가 airflow에 대한 이해도가 높지않은점을 고려해서 최대한 단순하게 사용할수있었으면 했다. task에서 return되는 변수는 pickle로 minIO의 custom xcom 버킷에 날짜별로 저장되게 되고, lifecycle 설정을 통해 생성된지 90일이 된 데이터는 삭제되도록했다. 이렇게하면 대규모 데이터 특히 pandas dataframe을 사용하는 유저가 많은데 이런 df를 쉽게 task간 전송이 가능해졌다. 그리고, 한개 df가 아닌 여..

airflow 2024.04.14

airflow configuration 주요 파라미터

간단하게 적어본 주요 airflow configuration 파라미터 airflow web에 hostname출력여부 expose_hostname= True metrics 사용 statsd_on= True custom xcom사용시 xcom_backend=plugins.airflow-plugin.custom_xcom.CustomXComBackendS3 - DagFileProcessManager관련(dag객체로 전환할 파일감지/제외) ## 300, 신규(또는 수정된)dag파일 scan주기 dag_dir_list_interval=120 기본적인 dag패치 loop주기(변경안된파일, 최근 수정된 파일은 list제외) min_file_process_interval=30 - DagFileProcessorProces..

airflow 2024.03.19

DAG ~ seems to be missing from DagBag

1.현상 DAG ~ seems to be missing from DagBag 특정 dag가 안보이는 문제가 생김 ->scheduler 로그 확인해봄 pq: could not resize shared memory segment "/PostgreSQL.2058389254" to 12615680 bytes: No space left on device 2.원인 airflow의 external database인 postgresql 3개 pod 의 shm size 확인 -> 1번 data pod의 shm이 full shm 64M 64M 176K 100% /dev/shm 3.조치 - postgresql statefulset의 memory limit 늘림 - memory사이즈에 이어서 shared memory 사이즈 ..

airflow 2024.03.13

airflow taskflow

airflow taskflow예시 다양한 종류의 데이터별로 쿼리를 수행한후 파싱한후에 일부는 db에 담아 continuous learning용도로 쓰고, 일부는 pivot해서 spotfire등의 visualization용도로 사용한다면? 빅데이터레이크에 직접적으로 여러 쿼리를 날릴경우에, 병렬수행을 하면 dag의 구동시간을 줄이고 효율적인 workflow가 될 수있을것이다. 이때 taskflow를 사용한 dag작성은 직관적이면서도 쉽게 동시성 코딩을 할 수 있다. 아래는 예시.. 자세한 설명은 귀찮으니 담에... from airflow.utils.dates import days_ago from airflow.decorators import dag, task, task_group import pandas ..

airflow 2024.03.06