# DAG File Processing - DAG 파일이 Meta Database에 등록되는 과정
#apache-airflow #dag-file-processing #scheduler
***"Apache Airflow 2.10.4 버전을 기준으로 작성하였습니다."***
이 글은 Apache Airflow에서 새로 등록된 DAG 파일을 어떻게 처리하여 Meta Database에 저장하는지에 대해 설명하는 글입니다.
- 목차:
1. 뜯어보기로 결심한 이유
2. 실행 과정의 간략한 정리
3. 실행 과정을 코드레벨에서 분석해보기
4. 마치며
## 1. 뜯어보기로 결심한 이유
평소 DAG 파일이 DAGs 폴더에 등록되면, 어떠한 과정을 통해 Meta Database에 등록되고 실행 가능한 상태가 되는지 궁금해하였습니다.
저는 이런 궁금증이 생기면 바로 알아보는 성격이므로,
이번에도 시간을 들여 천천히 코드 저 아래로 내려가는 시간을 가져보았습니다.
Apache Airflow 프로젝트는 상당히 많은 기여가 발생하는 큰 프로젝트인 만큼, 드넓은 코드 베이스를 가지고 있으며, 각 class 간의 추상화 레벨이 높아 여러모로 헤맸던 시간이 많았습니다.
하지만, 결국 Meta Database에 DAG를 저장하는 로직을 찾아내었습니다.
따라서, 여러분은 제가 설명하는 코드의 흐름을 물 흐르듯 따라오시고, DAG를 저장하는 메인 로직을 같이 이해해보시면 좋을 것 같습니다.
## 2. 실행 과정의 간략한 정리
먼저, 우리가 작성한 DAG 파일이 Meta Database에 등록되는 간략한 과정을 설명드리고자 합니다.
DAG 파일은 DAGs 폴더에 저장되면, Scheduler에 의해 실행된 **DAG File Processing** 작업에 의해 Meta Database에 저장됩니다.
[Airflow의 공식 문서에서는 Dag File Processing](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dagfile-processing.html#dag-file-processing)을 다음과 같이 정의하고 있습니다.
> DAG File Processing refers to the process of turning Python files contained in the DAGs folder into DAG objects that contain tasks to be scheduled.
>
> There are two primary components involved in DAG file processing.
> The `DagFileProcessorManager` is a process executing an infinite loop that determines which files need to be processed, and the `DagFileProcessorProcess` is a separate process that is started to convert an individual file into one or more DAG objects.
한글로 바꾸어 설명드리면,
> **DAG File Processing**은 DAGs 폴더의 Python 파일을 분석하여 스케줄링하기 위한 DAG 객체로 만드는 작업입니다.
>
> 이 작업에는 어떤 파일을 처리할지 결정하는 무한 루프인 DagFileProcessorManager와 개별 파일을 DAG 객체로 변환하는 DagFileProcessorProcess로 구성되어 있습니다.
이로써 **DAG File Processing**이 어떤 작업인지 간략하게나마 알 수 있게 되었습니다.
그렇다면, **DAG File Processing**은 언제 실행될까요?
공식 문서를 더 읽어보면 알 수 있습니다.
> ... otherwise, starting the scheduler process (`airflow scheduler`) also starts the DagFileProcessorManager.
**DAG File Processing**의 주요 요소 중 하나인 **DagFileProcessorManager**는 `airflow scheduler` CLI를 통해 실행된다는 것을 확인할 수 있습니다.
### 설명을 요약하면 다음과 같습니다.
1. **DAG File Processing** 작업은 DAGs 폴더에 저장된 Python 파일 내부의 DAG 객체를 실행 가능한 형태로 만든다.
2. **DAG File Processing**에는 두 가지의 주요 요소가 있으며, 이는 **DagFileProcessorManager**와 **DagFileProcessorProcess**이다.
3. **DagFileProcessorManager**는 어떤 파일을 처리할지 결정하는 무한 루프를 실행하는 프로세스이며, **DagFileProcessorProcess**는 각 Python 파일을 분석하여 포함된 DAG 정보를 객체로 변환하는 작업을 수행한다.
4. **DAG File Processing** 작업은 Airflow CLI 명령 중 하나인 `airflow scheduler`에서 실행된다.
### 현재까지의 과정
> [!note]
> `airflow scheduler`에 의해 **DagFileProcessorManager**가 실행되고, 이후 Meta Database에 처리된 DAG가 저장됩니다.
> ```
> CLI(`airflow scheduler`) -> ... -> DagFileProcessorManager -> ... -> Meta Database
> ```
공식 문서를 좀 더 살펴보겠습니다.
문서에는 다음과 같은 다이어그램이 존재합니다.
이 다이어그램을 통해 DAG File Processing 작업을 보다 자세히 이해할 수 있습니다.
![[airflow-docs-dag-file-processing.png]]
크게 두 가지의 컴포넌트로 나뉘어 있는 것을 확인할 수 있으며,
DagFileProcessorManager의 실행과정 중 DagFileProcessorProcess가 호출되는 것을 알 수 있습니다.
위에서 설명했던 것과 같이, DagFileProcessorManager는 어떤 파일을 처리할지 결정하는 무한루프를 실행하는 프로세스이며, 이 프로세스가 지속적으로 실행되면서 DagFileProcessorProcess를 실행하는 것을 알 수 있습니다.
아래의 순서는 공식문서의 다이어그램을 풀어서 설명한 것입니다.
1. **DagFileProcessorManager**
1. 신규 파일 확인
2. 최근 처리된 파일 제외
3. Queue에 처리할 파일 경로 삽입
4. 처리할 파일들을 지정하여 DagFileProcessorProcess 실행
2. **DagFileProcessorProcess**
1. DagFileProcessorManager에 의해 전달된 파일을 하나씩 처리
2. 전달된 파일에 명시된 모듈들을 로드
3. 모듈들을 처리
4. **DagBag** 반환
3. 다시 **DagFileProcessorManager**
1. 처리 결과 수집
2. 통계치 로깅
위 내용을 미루어 보았을 때,
아마 Meta Database에 처리된 DAG를 저장하는 로직은 DagFileProcessorProcess의 내부에 있을 것으로 예상됩니다. DagBag을 반환하는 과정의 가까운 곳에 위치할 것 같네요.
아직, 어느 부분에서 DAG를 Meta Database에 저장한다는 것은 확인할 수 없었습니다.
따라서, 위 과정을 코드레벨에서 분석해보기로 했습니다.
## 3. DAG를 Meta Database에 저장하기까지의 실행 과정을 코드레벨로 살펴보기
> [!info]
> 실행과정의 코드를 전부 보여드리며 설명드리려 했으나,
> 생각보다 글이 지루해지는 것 같아 전부를 보여드리는 것이 아닌
> 실행 과정만 보여드리고 메인 로직에 대한 설명을 자세히 하는 형식으로 변경하였습니다.
>
> 만약 전체 코드를 보고 싶으시면, 각 영역에 걸어둔 링크를 통해 확인하시길 바랍니다(각 링크마다 해당 코드의 시작점에 해당하는 Line을 함께 넣은 디테일을 확인해주세요).
>
먼저, `airflow scheduler`를 진입점으로 시작하여, 최종 처리된 DAG가 Meta Database에 저장되는 코드까지의 과정을 살펴보도록 하겠습니다.
### 3-1. Scheduler에서 DAG File Processing 작업 실행
공식문서의 내용과 같이, DAG File Processing 작업은 Scheduler에 의해 실행됩니다.
문서의 내용을 미루어 보았을 때, "Scheduler는 DagFileProcessorManager를 통해 DAG File Processing 작업을 실행하는구나."라고 생각할 수 있습니다.
하지만 코드를 직접 들여다보니 실제로는 좀 더 세분화된 작업에 의해 실행되는 것을 확인할 수 있었습니다.
**Scheduler**에 의해 `SchedulerJobRunner`가 호출되고,
이 **Job Runner**를 통해 `DagFileProcessorAgent`가 호출된다는 것을 말이죠.
그리고, 이어서 `DagFileProcessorAgent`에 의해 `DagFileProcessorManager`가 호출된다는 것을 확인할 수 있었습니다.
해당 과정이 실행되는 순서를 코드로 나열해보았습니다(각 메서드에 포함된 파라미터들은 제외하였습니다. 이 또한 걸어둔 링크를 통해 확인해보시길 바랍니다).
1. **Scheduler**
- **"Start Airflow Scheduler"**
1. [`scheduler()`](https://github.com/apache/airflow/blob/2.10.4/airflow/cli/commands/scheduler_command.py#L53)
1. [`_run_scheduler_job()`](https://github.com/apache/airflow/blob/2.10.4/airflow/cli/commands/scheduler_command.py#L41)
2. [**SchedulerJobRunner**](https://github.com/apache/airflow/blob/2.10.4/airflow/jobs/scheduler_job_runner.py#L139)
- **"SchedulerJobRunner runs for a specific time interval and schedules jobs that are ready to run."**
1. [`_execute()`](https://github.com/apache/airflow/blob/2.10.4/airflow/jobs/scheduler_job_runner.py#L947)
4. [**DagFileProcessorAgent**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L104)
- **"Agent for DAG file processing."**
2. [`start()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L152)
1. [`_run_processor_manager()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L220)
5. **DagFileProcessorManager**
그렇다면, DAG를 Meta Database에 저장하는 로직은 어디에 있을까요?
**DagFileProcessorManager** 부터 쭉 따라가보도록 하겠습니다.
6. [**DagFileProcessorManager**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L238)
- **"Manage processes responsible for parsing DAGs."**
1. [`start()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L472)
1. [`_run_parsing_loop()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L555)
1. `hearbeat()`
1. `_heartbeat_manager()`
2. [`start_new_processes()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L1214)
7. [**DagFileProcessorProcess**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/manager.py#L1227)
- **"Runs DAG processing in a seperate process using DagFileProcessor."**
2. [`start()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L220)
1. [`_run_file_processor()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L142)
1. [`_handle_dag_file_processing()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L177)
8. **DagFileProcessor**
위 순서는 공식문서의 내용과 일치합니다.
DagFileProcessorManger의 `start()`를 시작으로, 그 안에서 `_run_parsing_loop()`를 호출하고, `_run_parsing_loop()`의 내부에서 `heartbeat()`를 호출합니다.
`heartbeat()`는 `heartbeat_manager()`를 호출하고, `heartbeat_manager()`는 또 다시 `start()`를 호출합니다.
이로써 공식문서의 내용인 **==DagFileProcessorManager는 어떤 파일을 처리할지 결정하는 무한루프 프로세스==** 라는 것을 코드에서 확인할 수 있었습니다.
**DagFileProcessorManager**의 Process files 작업은 **DagFileProcessorProcess**을 호출을 통해 수행됩니다.
이어서 코드로 확인해보겠습니다.
9. [ **DagFileProcessor**](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L415)
- **"Process a Python file containing Airflow DAGs."**
1. [`process_file()`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L889)
1. [`dagbag = DagFileProcessor._get_dagbag(cls, file_path)`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L880)
2. [`save_dag_to_db(dags=dagbag.dags, dag_directory, ...)`](https://github.com/apache/airflow/blob/2.10.4/airflow/dag_processing/processor.py#L976)
10. [**DagBag**](https://github.com/apache/airflow/blob/2.10.4/airflow/models/dagbag.py#L99)
- **"A dagbag is a collection of dagsa, parsed out of a folder tree and has high level configuration settings."**
1. [`_sync_to_db(dags=dags, processor_subdir=dag_directory, ...)`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/dagbag.py#L663)
1. [`_serialize_dag_capturing_errors(dag, session, processor_subdir)`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/dagbag.py#L676)
- `_sync_to_db(dags)`를 통해 전달된 dags를 하나씩 순회하며 실행됩니다.
11. [**SerializedDagModel**](https://github.com/apache/airflow/blob/2.10.4/airflow/models/serialized_dag.py#L57)
- **"A table for serialized DAGs"**
1. [`write_dag(dag, min_update_interval, processor_subdir, ...)`](https://github.com/apache/airflow/blob/2.10.4/airflow/models/serialized_dag.py#L137)
12. **INSERT INTO Meta Database**
위와 같은 코드의 동작을 통해 최종적으로 DAG 파일이 Meta Database에 저장되는 메인 로직인 `write_dag()`가 실행됩니다.
### 3-2. DAG 파일을 Meta Database에 저장하는 코드 분석
메인 로직을 좀 더 자세히 뜯어보도록 하겠습니다.
```python
class SerializedDagModel(Base):
"""
A table for serialized DAGs.
serialized_dag table is a snapshot of DAG files synchronized by scheduler.
This feature is controlled by:
* ``[core] min_serialized_dag_update_interval = 30`` (s):
serialized DAGs are updated in DB when a file gets processed by scheduler,
to reduce DB write rate, there is a minimal interval of updating serialized DAGs.
* ``[scheduler] dag_dir_list_interval = 300`` (s):
interval of deleting serialized DAGs in DB when the files are deleted, suggest
to use a smaller interval such as 60
* ``[core] compress_serialized_dags``:
whether compressing the dag data to the Database.
It is used by webserver to load dags
because reading from database is lightweight compared to importing from files,
it solves the webserver scalability issue.
"""
...
@classmethod
@provide_session
def write_dag(
cls,
dag: DAG,
min_update_interval: int | None = None,
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
) -> bool:
"""
Serialize a DAG and writes it into database.
If the record already exists, it checks if the Serialized DAG changed or not. If it is
changed, it updates the record, ignores otherwise.
...
"""
# Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
# If Yes, does nothing
# If No or the DAG does not exists, updates / writes Serialized DAG to DB
# 기준에 맞지 않다면 처리를 생략합니다.
if min_update_interval is not None:
if session.scalar(
select(literal(True)).where(
cls.dag_id == dag.dag_id,
(timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated,
)
):
return False
log.debug("Checking if DAG (%s) changed", dag.dag_id)
# cls의 dag와 processor_subdir에 대해 더 알아봐야 해석할 수 있는 구문이네요.
new_serialized_dag = cls(dag, processor_subdir)
# 새로 저장할 DAG와 비교하기 위해, 기존 DB에 저장된 레코드를 가져옵니다.
serialized_dag_db = session.execute(
select(cls.dag_hash, cls.processor_subdir).where(cls.dag_id == dag.dag_id)
).first()
# 새로 저장할 DAG와 앞서 가져온 기존의 레코드를 비교합니다.
if (
serialized_dag_db is not None
and serialized_dag_db.dag_hash == new_serialized_dag.dag_hash
and serialized_dag_db.processor_subdir == new_serialized_dag.processor_subdir
):
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return False
log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
# 여기에서 최종적으로 저장이 되겠네요.
# 그렇다면, new_serialized_dag에 대해서 더 자세히 살펴볼까요?
session.merge(new_serialized_dag)
log.debug("DAG: %s written to the DB", dag.dag_id)
return True
```
#### 3-2-1. `new_serialized_dag` 더 자세히 알아보기
`SerializedDagModel.write_dag()`의 맨 마지막은 `session.merge(new_serialized_dag)`를 실행하는 것이었습니다.
```python
class SerializedDagModel(Base):
...
@classmethod
@provide_session
def write_dag(
cls,
dag: DAG,
min_update_interval: int | None = None,
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
) -> bool:
...
new_serialized_dag = cls(dag, processor_subdir)
...
session.merge(new_serialized_dag)
```
`session.merge()`에 사용되는 new_serialized_dag가 어떤 것인지 알아야, 비로소 DAG가 Meta Database에 저장되는 과정을 이해할 수 있을 것 같습니다.
그렇다면, `new_serialized_dag`는 어떤 값을 가지고 있을까요? 한번 찾아가보겠습니다.
우선 `cls()`에 대해 알아보겠습니다.
이 함수는 상위 함수인 `write_dag(cls, ...)`를 경유하여 전달된 값입니다.
쉽게 설명하면, 함수가 속한 class 객체에 접근할 수 있도록 해주는 기능입니다.
따라서 아래의 코드와 같이 동작합니다.
```python
new_serialized_dag = SerializedDagModel(dag, processor_subdir)
```
그렇다면 이렇게도 볼 수 있겠죠.
```python
SerializedDagModel.__init__(dag, processor_subdir)
```
자, 이제 `SerializedDagModel`의 내부로 들어가보겠습니다.
`SerializedDagModel`의 `__init__()`은 다음과 같이 정의되어 있습니다.
```python
class SerializedDagModel(Base):
...
def __init__(
self,
dag: DAG,
processor_subdir: str | None = None
) -> None:
self.dag_id = dag.dag_id
self.fileloc = dag.fileloc
self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
self.last_updated = timezone.utcnow()
self.processor_subdir = processor_subdir
dag_data = SerializedDAG.to_dict(dag)
dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8")
self.dag_hash = md5(dag_data_json).hexdigest()
if COMPRESS_SERIALIZED_DAGS:
self._data = None
self._data_compressed = zlib.compress(dag_data_json)
else:
self._data = dag_data
self._data_compressed = None
self.__data_cache = dag_data
```
`__init__(dag, processor_dir)`에 전달된 파라미터와 이를 `__init__()`의 내부에서 어떻게 활용하는지 분석해보아야 합니다.
이 파라미터들은 어디에서 선언되어 넘어오는 것일까요?
이를 확인하기 위해 지금까지 실행된 코드의 순서를 역순으로 쫓아가보았습니다.
#### 3-2-2. `dag: DAG`
`dag`의 실제 값을 알아보기 위해 코드의 실행을 역순으로 쫓아가보았습니다.
1. `SerializedDagModel.__init__(dag, ...)`
2. `SerializedDagModel.write_dag(dag, ...)`
3. `DagBag._serialized_dag_capturing_errors(dag, ...)`
4. `DagBag._sync_to_db(dags, ...)`
5. `DagFileProcessor.save_dag_to_db(dags=dagbag.dags)`
6. `dagbag = DagFileProcessor._get_dagbag(cls, file_path)`
위 실행순서를 보았을 때, dagbag에 대해서 알아야 이해할 수 있을 것 같습니다.
따라서 6번의 dagbag을 생성하기 위한 `DagFileProcessor의 _get_dagbag(...)`에 대해 살펴보도록 하겠습니다.
```python
# airflow/dag_processing/processor.py
class DagFileProcessor
...
@classmethod
def _get_dagbag(cls, file_path: str):
try:
return DagBag(file_path, include_examples=False)
...
```
`_get_dag()`는 단순히 `DagBag`을 반환하는 함수입니다.
그렇다면 `DagBag`은 어떤 정보들을 가지고 있을까요? 이어서 확인해보겠습니다.
```python
# airflow/models/dagbag.py
class DagBag(LoggingMixin):
"""
A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings.
...
"""
```
#### 3-2-3. `processor_subdir: str`
작성중 :)
```scss
Scheduler()
└─> _run_scheduler_job()
└─> SchedulerJobRunner._execute()
└─> DagFileProcessorAgent.start()
└─> ...
```
> [!note] 정리
> 위 과정을 전부 나열하면 다음과 같습니다.
> ```
> Scheduler
> -> SchedulerJobRunner
> -> DagFileProcessorAgent
> -> DagFileProcessorManager
> -> DagFileProcessorProcess
> -> DagFileProcessor
> -> DagBag
> -> SerializedDagModel
> -> Meta Database(serialized_dag)
> ```
> 그리고 각각의 큰 역할을 묶으면 다음과 같이 표현할 수 있겠네요.
> ```
> [Scheduler] -> [DagFileProcessor] -> DagBag -> SerializedDagModel -> Meta Database
> ```
>
## 4. 마치며
Apache Airflow 오랫동안 Apache 재단의 프로젝트 최상위권을 차지하고 있는 매우 인기있는 프로젝트입니다.
아래의 그래프를 보시면, 이 프로젝트가 얼마나 많은 인기를 가지고 있는지 바로 이해하실 수 있습니다(이미 많은 Star 수를 가지고 있음에도 꾸준하게 우상향으로 나아가는 것을 볼 수 있습니다).
(25년 2월 기준 약 3만 9천개의 Star)

Apache Airflow는 이 만큼이나 많은 사람들이 관심을 가지고 지켜보는 프로젝트이며,
그에 따라 많은 능력자들에게 기여를 받고 있습니다.
업무를 하면서 프레임워크를 잘 사용하는 것도 물론 중요하지만,
단순한 사용을 넘어, 그 안의 동작을 이해하면 훨씬 더 깊이있는 프로그래밍 생활이 될 것입니다.
그리고 실제 동작을 이해한 상태에서 사용한다면, 디버깅을 빨리 하게 되거나 더 효율적으로 동작하는 코드를 작성할 수 있겠죠.
그렇기에,
저는 앞으로도 꾸준히 관심있는 오픈소스 프로젝트들의 코드베이스를 탐구할 예정입니다.
이런 습관을 지속한다면,
Airflow와 같은 대규모 프로젝트의 코드레벨에 직접 기여하거나,
더 나아가 직접 만들어 볼 수 있게 되지 않을까요? (Creator of something...)
아, 여기까지 읽으신 분들에게만 재미있는 정보를 공유드립니다.
혹시 여러분은 Apache Airflow의 초기 프로젝트명을 알고 계신가요?
재미로 Airflow 창시자분의 초기 Repositary를 들춰보던 중에 발견하였습니다.
==그 이름은 바로 **Flux**.==
아래의 코드에서 발견할 수 있었습니다.
([Apache Airflow Github Repositary의 첫 Commit 중 일부](https://github.com/apache/airflow/commit/1047940ca4363b04044c4963b9c88f7632746407#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R1))
![[apache-airflow-first_official_commit_by_mistercrunch.png]]
그리고 이 커밋을 작성한 User는 바로 Airflow의 Creator인 Maxime Beauchemin입니다([Github 닉네임은 mistercrunch](https://github.com/mistercrunch)).
이 분이 직접 커밋한 내용들을 보면, Airbnb와 관련된 프로젝트라는 것을 확인할 수 있습니다(Airflow는 Airbnb에서 만든 프로젝트로 알려져 있죠).
다음에는 2.10.5 버전으로 올라가면서 달라진 DAG File Processing에 대해 소개드릴 예정입니다.
(DAG Bundle이라는 기능이 생겼더라구요. 혹시 궁금하시다면, Apache Airflow의 프로젝트 방향을 논의하는 AIP(Airflow Improvement Proposal)의 [AIP-66: DAG Bundles & Parsing](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=294816356)에서 확인해보시길 바랍니다)
긴 글을 끝까지 읽어주셔서 정말 감사합니다!