data engineering 9

keydb

1. keydb vs redis기본적으로 keydb는 redis에서 fork되어 개선하고자 탄생했다.특히 keydb는 기존 redis에서 더 발전한 HA구성이 돋보인다.기존 reids처럼 master-slave 구성도 지원하지만,Active-Replica와 Multi-Master 이 두가지 특성이 Redis보다 발전된 load balancing 기능을 지원한다.이 경우 keydb에서 sentinel을 사용할 필요가 없어진다.(redis-sentinel-HA proxy 더이상 안써도되나!!!)Active-Replica:Active-Replica 모드는 하나의 마스터 노드와 여러 Replica노드를 허용하는 Redis의 일반적인 Master-Slave replica 모델을 개선한 것이다.Active-Repl..

custom xcom + taskflow + multi task return 케이스

우리는 s3(minIO)를 활용한 custom xcom을 구성하여 airflow를 사용중이다. 빅데이터를 처리하기위해서 task간 xcom을 통한 데이터 이동시, storage object에 pickle형태로 보관하도록 했다. 우리 airflow를 사용하는 유저가 airflow에 대한 이해도가 높지않은점을 고려해서 최대한 단순하게 사용할수있었으면 했다. task에서 return되는 변수는 pickle로 minIO의 custom xcom 버킷에 날짜별로 저장되게 되고, lifecycle 설정을 통해 생성된지 90일이 된 데이터는 삭제되도록했다. 이렇게하면 대규모 데이터 특히 pandas dataframe을 사용하는 유저가 많은데 이런 df를 쉽게 task간 전송이 가능해졌다. 그리고, 한개 df가 아닌 여..

minio scale out

distributed mode로 minIO의 lifecycle 정책이나, read/write가 잘반영이 안되고있었다. 운영환경의 minIO 이전 담당자가 설정을 조금 특이하게 했었다. 즉 MINIO_DISTRIBUTED_NODES = s3-minio-{0...3}.s3-minio-headless..sv.cluster.local -> 4개 headless서비스만 설정되어있는데, replica는 또 19개로 설정되어있었다. 실제로 4개 minIO pod 외 나머지 15개 pod는 접근이나 활용이 되지않고있었다. 이를 바로잡기 위해서 여러가지 시도를 해봤다. 0. statefulset scale=0 1.statefulset 수정 MINIO_DISTRIBUTED_NODES = s3-minio-{0...7}.s3..

airflow configuration 주요 파라미터

간단하게 적어본 주요 airflow configuration 파라미터 airflow web에 hostname출력여부 expose_hostname= True metrics 사용 statsd_on= True custom xcom사용시 xcom_backend=plugins.airflow-plugin.custom_xcom.CustomXComBackendS3 - DagFileProcessManager관련(dag객체로 전환할 파일감지/제외) ## 300, 신규(또는 수정된)dag파일 scan주기 dag_dir_list_interval=120 기본적인 dag패치 loop주기(변경안된파일, 최근 수정된 파일은 list제외) min_file_process_interval=30 - DagFileProcessorProces..

DAG ~ seems to be missing from DagBag

1.현상 DAG ~ seems to be missing from DagBag 특정 dag가 안보이는 문제가 생김 ->scheduler 로그 확인해봄 pq: could not resize shared memory segment "/PostgreSQL.2058389254" to 12615680 bytes: No space left on device 2.원인 airflow의 external database인 postgresql 3개 pod 의 shm size 확인 -> 1번 data pod의 shm이 full shm 64M 64M 176K 100% /dev/shm 3.조치 - postgresql statefulset의 memory limit 늘림 - memory사이즈에 이어서 shared memory 사이즈 ..

minIO lifecycle 적용

생성된지 하루가 지난 test라는 버킷의 모든 파일을(prefix="")를 삭제하는 lifecycle rule을 등록하는 스크립트 예시 from minio import Minio from datetime import datetime, timedelta from minio.commonconfig import ENABLED, Filter from minio.lifecycleconfig import Expiration, Rule, LifecycleConfig # MinIO 서버 정보 설정 minio_client = Minio( , access_key=, secret_key=, secure=False ) config = LifecycleConfig( [ Rule( ENABLED, rule_filter=Filte..

minIO셋업

minIO는 오브젝트 스토리지 매니지먼트툴이다. aws s3와 호환이된다. 오브젝트스토리지 매니지의 대표적은 오픈소스로, mlflow 와 함께 모델 버전관리에 쓰인다. 인풋데이터등 비정형데이터를 관리하기에 좋아 머신러닝에서 많이 쓰인다. 1.셋업 minio lifecycle정책을 집에서 테스트해볼일이 생겨서 간단하게 구축해보았다. -private git repostory에 helm index가 셋업된 helm registry가 있다고 가정 -private image registry가 있다고 가정 *주의: minIO 는 2021년 5월부터 AGPL 라이센스 v3가 되었다. 네트워크에 배포시 유관 소스코드 공개 필수 bitnami minIO helm chart를 활용했다. 먼저 values.yaml위에 덮어..

airflow taskflow

airflow taskflow예시 다양한 종류의 데이터별로 쿼리를 수행한후 파싱한후에 일부는 db에 담아 continuous learning용도로 쓰고, 일부는 pivot해서 spotfire등의 visualization용도로 사용한다면? 빅데이터레이크에 직접적으로 여러 쿼리를 날릴경우에, 병렬수행을 하면 dag의 구동시간을 줄이고 효율적인 workflow가 될 수있을것이다. 이때 taskflow를 사용한 dag작성은 직관적이면서도 쉽게 동시성 코딩을 할 수 있다. 아래는 예시.. 자세한 설명은 귀찮으니 담에... from airflow.utils.dates import days_ago from airflow.decorators import dag, task, task_group import pandas ..