[Airflow 파먹기] airflow dags list

1. ActionCommand 살펴보기

A. airflow dags list

# airflow/cli/cli_config.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/cli_config.py#L2006-L2011
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/cli_config.py#L1110-L1115

core_commands: list[CLICommand] = [
    GroupCommand(
        name="dags",
        help="Manage DAGs",
        subcommands=DAGS_COMMANDS,
    ),
    
...
    
    DAGS_COMMANDS = (
        ...
    ActionCommand(
        name="list",
        help="List all the DAGs",
        func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
        args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
    ),
        ...
    )

이전에 봤던 것과 마찬가지로 lazy_load_command를 통해 ~.dag_list_dags를 load한다.

이 cli command는 모든 dags를 보여준다.

cli command를 실행하면 아래와 같은 출력을 보여준다. (원래 default로 설치되는 dag가 이렇게 많았었나..?)

B. dag_list_dags

# airflow.commands.dag_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/dag_command.py#L324-L346

from airflow.utils import cli as cli_utils, timezone

@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_dags(args) -> None:
    """Displays dags with or without stats at the command line."""
    dagbag = DagBag(process_subdir(args.subdir))
    if dagbag.import_errors:
        from rich import print as rich_print

        rich_print(
            "[red][bold]Error:[/bold] Failed to load all files. "
            "For details, run `airflow dags list-import-errors`",
            file=sys.stderr,
        )
    AirflowConsole().print_as(
        data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")),
        output=args.output,
        mapper=lambda x: {
            "dag_id": x.dag_id,
            "filepath": x.filepath,
            "owner": x.owner,
            "paused": x.get_is_paused(),
        },
    )
a. import에 alias가 2개 붙을 수 있나?

특이한 점은 decorator가 2개가 붙어있다는 점이다. decorator부터 까보자.

decorator를 까보기 위해 cli_utils가 어떤건지 확인해보았다. 하나의 import에 alias가 2개가 있을 수 있나? 🤔

실제로 alias를 2개 붙인 것은 아니였다. from airflow.utils import cli as cli_utilsfrom airflow.utils import timezone을 한 문장으로 작성한 것이였다. (이게 바로 신텍스 슈거?)

b. action_cli decorator
# airflow.utils.cli.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/utils/cli.py#L61-L124

def action_cli(func=None, check_db=True):
    def action_logging(f: T) -> T:
        """
        decorates function to execute function at the same time submitting action_logging
        but in cli context. it will call action logger callbacks twice,
        one for pre-execution and the other one for post-execution.

        Action logger will be called with below keyword parameters:
                ...

        :param f: function instance
        :return: wrapped function
        """

        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            """
            An wrapper for cli functions. It assumes to have Namespace instance
            at 1st positional argument

                    ...
            """
            _check_cli_args(args)
            metrics = _build_metrics(f.__name__, args[0])
            cli_action_loggers.on_pre_execution(**metrics)
            verbose = getattr(args[0], "verbose", False)
            root_logger = logging.getLogger()
            if verbose:
                root_logger.setLevel(logging.DEBUG)
                for handler in root_logger.handlers:
                    handler.setLevel(logging.DEBUG)
            try:
                # Check and run migrations if necessary
                if check_db:
                    from airflow.configuration import conf
                    from airflow.utils.db import check_and_run_migrations, synchronize_log_template

                    if conf.getboolean("database", "check_migrations"):
                        check_and_run_migrations()
                    synchronize_log_template()
                return f(*args, **kwargs)
            except Exception as e:
                metrics["error"] = e
                raise
            finally:
                metrics["end_datetime"] = datetime.utcnow()
                cli_action_loggers.on_post_execution(**metrics)

        return cast(T, wrapper)

    if func:
        return action_logging(func)
    return action_logging

action_logging을 제출하는 동시에 cli 컨텍스트에서 함수를 실행하도록 한다. 액션 로거 콜백을 두 번 호출하는데, 사전 실행용, 사후 실행용이다.

일단 decorator에 대해서 좀 더 찾아보자. python decorator - coding dojang, Python deocrator - Daleseo

decoratro가 뭔지는 대략적으로 알겠다.

여기서도 궁금증이 셍긴다.

  1. 어떻게 action_cli()에 아무런 인자도 주지 않았는데 funcdag_list_dags 함수가 들어가는걸까?
  2. return cast(T, wrapper)에서 wrapper는 이미 callable인데 왜 CAST로 감싸준걸까?
  3. decorator는 어떻게 @만으로 호출되는걸까?

1번과 3번에 대한 답을 찾아가기 위해서는 CPython을 까봐야 정확히 알 수 있을 것 같다. (이건 위시리스트에 넣어두기...)

그래도 decorator의 탄생(?)은 PEP 318 – Decorators for Functions and Methods 에서 시작되었다.

그 외에 decorator가 어떻게 @와 매핑되는지도 궁금해서 코드를 찾아보고 싶었으나, 정확하게 찾지는 못했다. Implementation for PEP 318 using syntax J2, Implementation for PEP 318 ([as classmethod] version). cpython

결론은 정확히 어떤 코드로 동작하는지 확인하려면 CPython까지 가야한다는 것이다. (위시리스트로...)

CPython 코드를 까보는 것은 파이썬에 switch문 넣기 : 새 구문을 만들면서 배우는 파이썬 내부 에서 도움을 얻을 수 있다.

CPython에 대한 책은 cpython-internals-book

c. DagBag
# airflow.cli.commands.dag_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/dag_command.py#L324-L329

def dag_list_dags(args) -> None:
    """Displays dags with or without stats at the command line."""
    dagbag = DagBag(process_subdir(args.subdir))
        ...
# airflow.models.dagbag.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/models/dagbag.py#L73-L91

class DagBag(LoggingMixin):
    """
    A dagbag is a collection of dags, parsed out of a folder tree and has high
    level configuration settings, like what database to use as a backend and
    what executor to use to fire off tasks. This makes it easier to run
    distinct environments for say production and development, tests, or for
    different teams or security profiles. What would have been system level
    settings are now dagbag level so that one system can run multiple,
    independent settings sets.

    :param dag_folder: the folder to scan to find DAGs
    :param include_examples: whether to include the examples that ship
        with airflow or not
    :param read_dags_from_db: Read DAGs from DB if ``True`` is passed.
        If ``False`` DAGs are read from python files.
    :param load_op_links: Should the extra operator link be loaded via plugins when
        de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links
        are not loaded to not run User code in Scheduler.
    """

DagBag이 뭐지?

folder tree에서 파싱된 DAGs의 모음. 어떤 db를 백엔드로 사용할지, 어떤 executor를 사용하여 작업을 실행할지 등 높은 수준의 구성 설정이 있다.

DagBag이 엄청 중요한 것은 알겠는데, 뭔가 확 와닿지 않는다.

# GPT Plus

Airflow는 데이터 파이프라인을 스케줄링하고 모니터링하는데 사용되는 오픈소스 플랫폼입니다. Airflow는 데이터 처리 작업을 "DAG(Directed Acyclic Graph)"라는 형태로 정의하게 해줍니다. 이 DAG는 여러 개의 태스크가 종속성을 가지고 연결되어 있는 구조입니다.

DAGBag은 Airflow에서 사용되는 핵심 개념 중 하나로, 모든 DAG의 컨테이너 역할을 합니다. DAGBag는 Airflow의 DAG 파일이 있는 디렉토리를 스캔하고 파이썬 모듈로부터 DAG 객체를 로드하는 역할을 합니다. 이렇게 로드된 DAG들은 Airflow 웹 인터페이스에서 사용자들이 볼 수 있습니다.

DAGBag은 Airflow가 DAG를 효과적으로 관리하고 처리할 수 있도록 도와주는 도구입니다. 특히 여러 DAG가 존재하고 이들 사이에 복잡한 의존성이 있는 경우, DAGBag는 이러한 복잡성을 관리하는데 도움을 줍니다.


# Bard

Airflow dagbag은 Airflow가 DAG를 저장하고 관리하는 메모리 내 데이터베이스입니다. DAGbag에는 DAG의 이름, 정의, 상태, 의존성 등이 포함되어 있습니다. Airflow는 DAGbag을 사용하여 DAG를 실행하고, 모니터링하고, 관리합니다.

DAGbag은 Airflow scheduler가 DAG를 실행하기 전에 생성됩니다. scheduler는 DAGbag을 사용하여 DAG의 상태를 추적하고, DAG가 실행되는 순서를 결정하고, DAG가 실패한 경우 DAG를 다시 실행합니다.

DAGbag은 Airflow web UI에서도 사용됩니다. web UI는 DAGbag을 사용하여 DAG를 목록으로 표시하고, DAG의 상태를 확인하고, DAG를 실행하고, DAG를 중지할 수 있습니다.

DAGbag은 Airflow의 필수 구성 요소입니다. Airflow는 DAGbag 없이 DAG를 실행하거나 모니터링하거나 관리할 수 없습니다.

DAG에 관련된 대부분의 일을 하는 것이 DagBag인가 보다.

d. dagbag.import_errors
# airflow.cli.commands.dag_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/dag_command.py#L324-L336

def dag_list_dags(args) -> None:
    """Displays dags with or without stats at the command line."""
    dagbag = DagBag(process_subdir(args.subdir))
    if dagbag.import_errors:
        from rich import print as rich_print

        rich_print(
            "[red][bold]Error:[/bold] Failed to load all files. "
            "For details, run `airflow dags list-import-errors`",
            file=sys.stderr,
        )
                ...

import_error가 있다면 아래 사진처럼 빨간색으로 표시된다. (pprint, rich.print 등등 print도 종류가 참 많은 것 같다.)

여기서는 rich.print를 사용해서 색을 표현했는데, standalone에서는 print_output 메소드를 만들어서 사용했다. 개발자마다 작성하는 코드가 다 제각각인 것 같다.

dagbag.import_errors는 아래 flow를 통해 수집된다.

DagBag.__init__() >> collect_dags >> process_file >> _load_modules_from_file >> parse
e. AirflowConsole()
# airflow.cli.commands.dag_commands.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/dag_command.py#L324-L346

def dag_list_dags(args) -> None:
    ...
    AirflowConsole().print_as(
        data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")),
        output=args.output,
        mapper=lambda x: {
            "dag_id": x.dag_id,
            "filepath": x.filepath,
            "owner": x.owner,
            "paused": x.get_is_paused(),
        },
    )

AirflowConsole로 dags를 print하는 구문같다.

dagbag.dagsdagbag.import_errors와 비슷한 flow로 수집된다.

DagBag.__init__() >> collect_dags >> process_file >> _process_modules >> bag_dag >> _bag_dag

output의 default value가 table인데, table에 값을 추가하는 방식도 신기하다.

이 글이 도움이 되었나요?

신고하기
0분 전
작성된 댓글이 없습니다. 첫 댓글을 달아보세요!
    댓글을 작성하려면 로그인이 필요합니다.