TaskFlow 2

custom xcom + taskflow + multi task return 케이스

우리는 s3(minIO)를 활용한 custom xcom을 구성하여 airflow를 사용중이다. 빅데이터를 처리하기위해서 task간 xcom을 통한 데이터 이동시, storage object에 pickle형태로 보관하도록 했다. 우리 airflow를 사용하는 유저가 airflow에 대한 이해도가 높지않은점을 고려해서 최대한 단순하게 사용할수있었으면 했다. task에서 return되는 변수는 pickle로 minIO의 custom xcom 버킷에 날짜별로 저장되게 되고, lifecycle 설정을 통해 생성된지 90일이 된 데이터는 삭제되도록했다. 이렇게하면 대규모 데이터 특히 pandas dataframe을 사용하는 유저가 많은데 이런 df를 쉽게 task간 전송이 가능해졌다. 그리고, 한개 df가 아닌 여..

airflow 2024.04.14

airflow taskflow

airflow taskflow예시 다양한 종류의 데이터별로 쿼리를 수행한후 파싱한후에 일부는 db에 담아 continuous learning용도로 쓰고, 일부는 pivot해서 spotfire등의 visualization용도로 사용한다면? 빅데이터레이크에 직접적으로 여러 쿼리를 날릴경우에, 병렬수행을 하면 dag의 구동시간을 줄이고 효율적인 workflow가 될 수있을것이다. 이때 taskflow를 사용한 dag작성은 직관적이면서도 쉽게 동시성 코딩을 할 수 있다. 아래는 예시.. 자세한 설명은 귀찮으니 담에... from airflow.utils.dates import days_ago from airflow.decorators import dag, task, task_group import pandas ..

airflow 2024.03.06