[Airflow 파먹기] cli_parser

# airflow.__main__
# https://github.com/apache/airflow/blob/main/airflow/__main__.py

...
    parser = cli_parser.get_parser()
...

1. get_parser()

# airflow.cli.cli_parser.py
# https://github.com/apache/airflow/blob/main/airflow/cli/cli_parser.py#L93-L106

@lru_cache(maxsize=None)
def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
    """Creates and returns command line argument parser."""
    parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
    subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND")
    subparsers.required = True

    command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
    subparser_list = command_dict.keys()
    sub_name: str
    for sub_name in sorted(subparser_list):
        sub: CLICommand = command_dict[sub_name]
        _add_command(subparsers, sub)
    return parser

@lru_cache는 뭘까? -> @lru_cache 메모리를 더 사용하는 대신 속도를 더 빠르게 하는 메모이제이션 데코레이터이다.

파이썬은 lru_cache 데코레이터를 통해 메모이제이션을 사용할 수 있다.

A. DefaultHelpParser()

# airflow.cli.cli_parser.py
# https://github.com/apache/airflow/blob/main/airflow/cli/cli_config.py#L58-L109

class DefaultHelpParser(argparse.ArgumentParser):
    """CustomParser to display help message."""

        ....
        def _check_value():
        ...
        
        def error():
        ...

argparse.ArgumentParser를 상속받는 클래스이다. overriding이 필요한 부분만 재정의해서 사용한다는 점에서 객체지향 프로그래밍의 장점을 느낄 수 있다.

ArgumentParser는 ArgumentParser에 정리가 잘 되어 있다. 저 글을 한번 읽고 코드를 보니 parser가 어떤 역할을 할지 감이 잡혔다. 추가로 공식 문서는 ArgumentParser - doc이다.

B. 다시 get_parser로 돌아와서

# airflow.cli.cli_parser.py
# https://github.com/apache/airflow/blob/main/airflow/cli/cli_parser.py#L93-L106

@lru_cache(maxsize=None)
def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
        ...
        
    command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
    subparser_list = command_dict.keys()
    sub_name: str
    for sub_name in sorted(subparser_list):
        sub: CLICommand = command_dict[sub_name]
        _add_command(subparsers, sub)
    return parser

argument로 prog='airflow'인 것을 보아 airflow ~~이런식으로 cli 명령을 인식하는 것 같다.

subcommand는 필수인 것 같고 COMMAND는 command_dict를 통해 생성?해주는 것 같다.

ALL_COMMANDS_DICT를 타고 넘어가보면, core_commands를 할당받는 부분이 존재한다. core_commands에서 자주 사용하는 명령만 보면 아래와 같다.

아래에 작성된 코드는 Airflow CLI command에서 확인할 수 있다.

# airflow.cli.cli_config.py
# https://github.com/apache/airflow/blob/main/airflow/cli/cli_config.py#L2019-L2229

core_commands: list[CLICommand] = [
    GroupCommand(
        name="dags",
        help="Manage DAGs",
        subcommands=DAGS_COMMANDS,
    ),
    GroupCommand(
        name="tasks",
        help="Manage tasks",
        subcommands=TASKS_COMMANDS,
    ),
    GroupCommand(
        name="db",
        help="Database operations",
        subcommands=DB_COMMANDS,
    ),
    ActionCommand(
        name="webserver",
        help="Start a Airflow webserver instance",
        func=lazy_load_command("airflow.cli.commands.webserver_command.webserver"),
        args=(
            ARG_PORT,
            ARG_WORKERS,
            ARG_WORKERCLASS,
            ARG_WORKER_TIMEOUT,
            ARG_HOSTNAME,
            ARG_PID,
            ARG_DAEMON,
            ARG_STDOUT,
            ARG_STDERR,
            ARG_ACCESS_LOGFILE,
            ARG_ERROR_LOGFILE,
            ARG_ACCESS_LOGFORMAT,
            ARG_LOG_FILE,
            ARG_SSL_CERT,
            ARG_SSL_KEY,
            ARG_DEBUG,
        ),
    ),
    ActionCommand(
        name="scheduler",
        help="Start a scheduler instance",
        func=lazy_load_command("airflow.cli.commands.scheduler_command.scheduler"),
        args=(
            ARG_SUBDIR,
            ARG_NUM_RUNS,
            ARG_DO_PICKLE,
            ARG_PID,
            ARG_DAEMON,
            ARG_STDOUT,
            ARG_STDERR,
            ARG_LOG_FILE,
            ARG_SKIP_SERVE_LOGS,
            ARG_VERBOSE,
        ),
        epilog=(
            "Signals:\n"
            "\n"
            "  - SIGUSR2: Dump a snapshot of task state being tracked by the executor.\n"
            "\n"
            "    Example:\n"
            '        pkill -f -USR2 "airflow scheduler"'
        ),
    ),
    ActionCommand(
        name="triggerer",
        help="Start a triggerer instance",
        func=lazy_load_command("airflow.cli.commands.triggerer_command.triggerer"),
        args=(
            ARG_PID,
            ARG_DAEMON,
            ARG_STDOUT,
            ARG_STDERR,
            ARG_LOG_FILE,
            ARG_CAPACITY,
            ARG_VERBOSE,
            ARG_SKIP_SERVE_LOGS,
        ),
    ),
    ...

여기서 아래 코드를 보고 구조가 조금 잘 짜여져 있다고 느꼈다.

아래 그림처럼 되어 있는데, 코드를 읽을때는 엄청 대단해보였는데, 막상 그림으로 그려보니 엄청 대단한 것 같지는 않다. 뭐든 상상만 할때는 거대해보이는 것 같다.

C. ActionCommand 살펴보기

a. lazy_load_command
# airflow.cli.cli_config.py
# https://github.com/apache/airflow/blob/main/airflow/cli/cli_config.py#L46-L56

def lazy_load_command(import_path: str) -> Callable:
    """Create a lazy loader for command."""
    _, _, name = import_path.rpartition(".")

    def command(*args, **kwargs):
        func = import_string(import_path)
        return func(*args, **kwargs)

    command.__name__ = name

    return command
lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags")

lazy loading이 뭘까? 어떤 느낌인지는 감이 오는데, 왜 lazy인지는 잘 모르겠다. (한번 gpt를 써볼까?)

# Bard

lazy_load_command 함수는 Airflow 명령줄 인터페이스(CLI)에 사용되는 함수입니다. 명령이 처음 호출될 때까지 명령을 로드하지 않기 때문에 "게으른" 로더입니다.

함수는 import_path 매개변수를 취하고 import_path에 해당하는 모듈을 로드하는 Callable을 반환합니다. 모듈이 처음 로드될 때 명령이 로드됩니다.

함수는 Airflow CLI에서 명령을 로드하는 데 유용합니다. 명령이 처음 호출될 때까지 명령을 로드하지 않기 때문에 메모리 사용을 줄이는 데 도움이 됩니다.

---
# Chat GPT Plus

일반적으로, 'lazy loading'은 객체의 초기화를 최대한 미루는 컴퓨터 프로그래밍 기술입니다. 이는 특히 메모리를 절약하고, 성능을 향상시킬 때 유용합니다. 이 방식을 따르면, "lazy_load_command"는 특정 커맨드를 실행하기 직전까지 해당 커맨드의 로드를 미룰 수 있습니다.

Chat GPT보다 Bard가 더 좋은 것 같기도 하고.. 흠..

어쨋든, lazy인 이유는 lazy_load_command()의 return으로 import한 결과를 반환하는 것이 아닌, 함수의 Callable을 반환하기 때문이다.

# airflow.cli.cli_parser.py
# https://github.com/apache/airflow/blob/main/airflow/cli/cli_parser.py#L135-L138

def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser) -> None:
    for arg in _sort_args(sub.args):
        arg.add_to_parser(sub_proc)
    sub_proc.set_defaults(func=sub.func)

즉, ArgumentParser에 CLI ActionCommand를 붙이는 시점에 import를 진행하는 것이 아니라. CLI command를 호출할 때 import가 진행되는 방식이라고 생각할 수 있다.

lazy_load_command()가 작성된 시점의 PR을 찾아보니 [AIRFLOW-6001] Lazy load CLI commands #6594를 찾을 수 있었다.

Currently, CLI loads all packages even when the command is not used in any way, which takes CPU time and memory. I suggest that only commands that are executed be loaded. This can speed up task launch time and also improve user productivity.

함수를 link(?)하는 시점에 import할 필요가 없다면, 호출하는 시점에 import하는 것이 메모리 차원에서 더 이득일 것이다. 비슷한 예로 함수 안에 import문을 작성하는 경우를 봤던 것 같다.

함수 안에 import하는 경우를 뭐라고 하는지 모르겠어서 GPT한테 물어봤더니 답이 다르다.. ㅋㅋ

# Bard

Python에서 함수 내부에 모듈을 가져오는 것을 "lazy import"라고 합니다. 즉, 함수가 실행될 때까지 모듈이 가져오지 않습니다. 이는 모듈이 필요하지 않은 경우 메모리 사용을 줄이는 데 도움이 될 수 있습니다.

----
# Chat GPT Plus

Python에서 특정 함수 내부에서 import 구문을 사용하는 경우를 "지역적인 임포트(Local Import)" 또는 "함수 내 임포트(Function-level Import)"라고 부릅니다.

이러한 방식은 일반적으로 코드의 가독성을 해칠 수 있으므로, 주의해서 사용해야 합니다. 하지만 다음과 같은 경우에는 유용할 수 있습니다:

모듈이 크고 메모리를 많이 사용하며, 해당 모듈이 특정 함수에서만 필요한 경우.
모듈의 로드 시간이 길어서 프로그램의 시작 시간에 영향을 미치는 경우.
특정 조건 하에서만 모듈이 필요한 경우.
또한, 이러한 방식은 순환적인 임포트(Circular Import) 문제를 방지하는 데에도 사용될 수 있습니다. 그러나 이런 상황은 가능하면 피하는 것이 좋습니다, 왜냐하면 그것은 코드의 구조가 적절하지 않을 수 있음을 나타내기 때문입니다.

Lazy import

여기서 끊고 각 CLI command를 파먹는 것은 다음 포스팅에서 다루려고 한다.

이 글이 도움이 되었나요?

신고하기
0분 전
작성된 댓글이 없습니다. 첫 댓글을 달아보세요!
    댓글을 작성하려면 로그인이 필요합니다.