[Airflow 파먹기] airflow standalone

1. ActionCommand 살펴보기

A. airflow standalone

Airflow CLI command - standalone

# airflow.cli.cli_config.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/cli_config.py#L2210-L2215

ActionCommand(
        name="standalone",
        help="Run an all-in-one copy of Airflow",
        func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
        args=tuple(),
),

위에서 lazy_load_command()의 파라미터를 import하는 방식인 것을 파악했으니, standalone 모듈로 들어가보자.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L304

standalone = StandaloneCommand.entrypoint

entrypoint로 연결되어 있다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L40-L50

class StandaloneCommand:
    """
    Runs all components of Airflow under a single parent process.

    Useful for local development.
    """

    @classmethod
    def entrypoint(cls, args):
        """CLI entrypoint, called by the main CLI system."""
        StandaloneCommand().run()

여기서 드는 2가지 의문이 생긴다.

  1. @staticmethod를 사용하지 않고 @classmethod를 사용했을까?
  2. entrypoint만 분리한 함수를 만들지 않은 이유는 뭘까?

1번은 파이썬을 더 공부하면서 알아가야할 패턴으로 보이고, 2번은 Add Airflow Standalone command #15826 에서 확인했을 때, 딱히 명확한 이유를 찾지 못했다.

# docs.apache-airflow.start.rst
# https://github.com/apache/airflow/blob/v2-6-stable/docs/apache-airflow/start.rst

airflow db init
airflow users create \
        --username admin \
        --firstname Peter \
        --lastname Parker \
        --role Admin \
        --email spiderman@superhero.org
# start the web server, default port is 8080
airflow webserver --port 8080
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler

StandaloneCommand().run()이 실행되면 위 CLI 명령이 순차적으로 실행된다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L59-L118

def run(self):
        """Main run loop."""
        self.print_output("standalone", "Starting Airflow Standalone")
        # Silence built-in logging at INFO
        logging.getLogger("").setLevel(logging.WARNING)
        # Startup checks and prep
        env = self.calculate_env()
        self.initialize_database()
        # Set up commands to run
        self.subcommands["scheduler"] = SubCommand(
                ...
        )
        self.subcommands["webserver"] = SubCommand(
            ...
        )
        self.subcommands["triggerer"] = SubCommand(
            ...
        )

        self.web_server_port = conf.getint("webserver", "WEB_SERVER_PORT", fallback=8080)
        # Run subcommand threads
        ...
        self.print_output("standalone", "Complete")
a. print_output()

특이한게, logger를 사용해서 console에 찍은 것이 아닌 print를 사용했다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L129-L143

    def print_output(self, name: str, output):
        """
        Prints an output line with name and colouring.

        You can pass multiple lines to output if you wish; it will be split for you.
        """
        color = {
            "webserver": "green",
            "scheduler": "blue",
            "triggerer": "cyan",
            "standalone": "white",
        }.get(name, "white")
        colorised_name = colored("%10s" % name, color)
        for line in output.split("\n"):
            print(f"{colorised_name} | {line.strip()}")

log 찍을때는 색칠하는 것은 안됬던가.. formatter에 색 관련된 것은 없었던 것 같기도 하다.

단순히 색깔때문이라기에는 뭔가 찝찝한데, doc string에도 colouring이라 적혀 있으니, 색깔때문인 것 같다.. 차라리 print_colouring_output() 로 했으면 더 좋았을지도 모르겠다. 이 부분은 내가 코드를 더 많이 작성해봐야 어떤 네이밍이 더 좋을지를 알 수 있을 것 같다.

b. db init
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L65

        env = self.calculate_env()

executor가 k8s인지 체크하고 database에 따라 환경변수를 다르게 설정해준다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L66

        self.initialize_database()
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L172-L211

    def initialize_database(self):
        """Makes sure all the tables are created."""
        # Set up DB tables
        self.print_output("standalone", "Checking database is initialized")
        db.initdb()
        self.print_output("standalone", "Database ready")
        # See if a user needs creating
        # We want a streamlined first-run experience, but we do not want to
        # use a preset password as people will inevitably run this on a public
        # server. Thus, we make a random password and store it in AIRFLOW_HOME,
        # with the reasoning that if you can read that directory, you can see
        # the database credentials anyway.
        from airflow.utils.cli_app_builder import get_application_builder

 ...

                # Store what we know about the user for printing later in startup
        self.user_info = {"username": "admin", "password": password}

db.initdb()는 아래 db CLI command에서 다룰 것이라서 넘어간다.

initdb()를 한 다음에 admin user가 존재하지 않으면 admin user를 생성한다. 패스워드는 random으로 정해서 standalone_admin_password.txt파일로 저장한다.

admin user는 존재하고, standalone_admin_password.txt 파일이 존재하면 파일에서 패스워드를 읽어서 password 객체에 저장한다.

standalone_admin_password.txt 파일이 없는 경우에는 password 객체에 None을 저장한다.

아마 새로운 패스워드 파일을 생성해주지 않은 이유는 admin 계정이라 함부로 패스워드를 재발급하지 않으려는 의도인 것 같다.

c. scheduler, webserver, triggerer
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L68-L85

        # Set up commands to run
        self.subcommands["scheduler"] = SubCommand(
            self,
            name="scheduler",
            command=["scheduler"],
            env=env,
        )
        self.subcommands["webserver"] = SubCommand(
            self,
            name="webserver",
            command=["webserver"],
            env=env,
        )
        self.subcommands["triggerer"] = SubCommand(
            self,
            name="triggerer",
            command=["triggerer"],
            env=env,
        )

실행할 명령어를 subcommands 딕셔너리 객체에 할당하고 starts()를 통해 스레드를 실행시킨다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L271-L300

class SubCommand(threading.Thread):
    """
    Execute a subcommand on another thread.

    Thread that launches a process and then streams its output back to the main
    command. We use threads to avoid using select() and raw filehandles, and the
    complex logic that brings doing line buffering.
    """

    def __init__(self, parent, name: str, command: list[str], env: dict[str, str]):
        super().__init__()
        self.parent = parent
        self.name = name
        self.command = command
        self.env = env

    def run(self):
        """Runs the actual process and captures it output to a queue."""
        self.process = subprocess.Popen(
            ["airflow"] + self.command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            env=self.env,
        )
        for line in self.process.stdout:
            self.parent.output_queue.append((self.name, line))

    def stop(self):
        """Call to stop this process (and thus this thread)."""
        self.process.terminate()

SubCommand 클래스로 이동해보면 Thread를 상속받은 것을 알 수 있다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L89-L90

        # Run subcommand threads
        for command in self.subcommands.values():
            command.start()

각 component들을 스레드로 실행한다.

# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L93-L111

        while True:
            try:
                # Print all the current lines onto the screen
                self.update_output()
                # Print info banner when all components are ready and the
                # delay has passed
                if not self.ready_time and self.is_ready():
                    self.ready_time = time.monotonic()
                if (
                    not shown_ready
                    and self.ready_time
                    and time.monotonic() - self.ready_time > self.ready_delay
                ):
                    self.print_ready()
                    shown_ready = True
                # Ensure we idle-sleep rather than fast-looping
                time.sleep(0.1)
            except KeyboardInterrupt:
                break

키보드 인터럽트가 발생할 때까지 무한 루프에 들어가는데, 스레드로 돌아가는 component들의 output을 주기적으로 업데이트해준다.

python에서의 thread와 process는 한번 제대로 공부해봐야겠다. coroutine, asyncio 등에 대해서도 알고만 있었지 잘 몰랐었는데, 공부해봐야겠다.

이 글이 도움이 되었나요?

신고하기
0분 전
작성된 댓글이 없습니다. 첫 댓글을 달아보세요!
    댓글을 작성하려면 로그인이 필요합니다.