1. ActionCommand 살펴보기
A. airflow standalone
# airflow.cli.cli_config.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/cli_config.py#L2210-L2215
ActionCommand(
name="standalone",
help="Run an all-in-one copy of Airflow",
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
args=tuple(),
),
위에서 lazy_load_command()
의 파라미터를 import하는 방식인 것을 파악했으니, standalone
모듈로 들어가보자.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L304
standalone = StandaloneCommand.entrypoint
entrypoint로 연결되어 있다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L40-L50
class StandaloneCommand:
"""
Runs all components of Airflow under a single parent process.
Useful for local development.
"""
@classmethod
def entrypoint(cls, args):
"""CLI entrypoint, called by the main CLI system."""
StandaloneCommand().run()
여기서 드는 2가지 의문이 생긴다.
- 왜
@staticmethod
를 사용하지 않고@classmethod
를 사용했을까? entrypoint
만 분리한 함수를 만들지 않은 이유는 뭘까?
1번은 파이썬을 더 공부하면서 알아가야할 패턴으로 보이고, 2번은 Add Airflow Standalone command #15826 에서 확인했을 때, 딱히 명확한 이유를 찾지 못했다.
# docs.apache-airflow.start.rst
# https://github.com/apache/airflow/blob/v2-6-stable/docs/apache-airflow/start.rst
airflow db init
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
# start the web server, default port is 8080
airflow webserver --port 8080
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
StandaloneCommand().run()
이 실행되면 위 CLI 명령이 순차적으로 실행된다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L59-L118
def run(self):
"""Main run loop."""
self.print_output("standalone", "Starting Airflow Standalone")
# Silence built-in logging at INFO
logging.getLogger("").setLevel(logging.WARNING)
# Startup checks and prep
env = self.calculate_env()
self.initialize_database()
# Set up commands to run
self.subcommands["scheduler"] = SubCommand(
...
)
self.subcommands["webserver"] = SubCommand(
...
)
self.subcommands["triggerer"] = SubCommand(
...
)
self.web_server_port = conf.getint("webserver", "WEB_SERVER_PORT", fallback=8080)
# Run subcommand threads
...
self.print_output("standalone", "Complete")
a. print_output()
특이한게, logger를 사용해서 console에 찍은 것이 아닌 print를 사용했다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L129-L143
def print_output(self, name: str, output):
"""
Prints an output line with name and colouring.
You can pass multiple lines to output if you wish; it will be split for you.
"""
color = {
"webserver": "green",
"scheduler": "blue",
"triggerer": "cyan",
"standalone": "white",
}.get(name, "white")
colorised_name = colored("%10s" % name, color)
for line in output.split("\n"):
print(f"{colorised_name} | {line.strip()}")
log 찍을때는 색칠하는 것은 안됬던가.. formatter에 색 관련된 것은 없었던 것 같기도 하다.
단순히 색깔때문이라기에는 뭔가 찝찝한데, doc string에도 colouring이라 적혀 있으니, 색깔때문인 것 같다.. 차라리 print_colouring_output() 로 했으면 더 좋았을지도 모르겠다. 이 부분은 내가 코드를 더 많이 작성해봐야 어떤 네이밍이 더 좋을지를 알 수 있을 것 같다.
b. db init
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L65
env = self.calculate_env()
executor가 k8s인지 체크하고 database에 따라 환경변수를 다르게 설정해준다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L66
self.initialize_database()
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L172-L211
def initialize_database(self):
"""Makes sure all the tables are created."""
# Set up DB tables
self.print_output("standalone", "Checking database is initialized")
db.initdb()
self.print_output("standalone", "Database ready")
# See if a user needs creating
# We want a streamlined first-run experience, but we do not want to
# use a preset password as people will inevitably run this on a public
# server. Thus, we make a random password and store it in AIRFLOW_HOME,
# with the reasoning that if you can read that directory, you can see
# the database credentials anyway.
from airflow.utils.cli_app_builder import get_application_builder
...
# Store what we know about the user for printing later in startup
self.user_info = {"username": "admin", "password": password}
db.initdb()
는 아래 db CLI command에서 다룰 것이라서 넘어간다.
initdb()
를 한 다음에 admin user가 존재하지 않으면 admin user를 생성한다. 패스워드는 random으로 정해서 standalone_admin_password.txt
파일로 저장한다.
admin user는 존재하고, standalone_admin_password.txt
파일이 존재하면 파일에서 패스워드를 읽어서 password 객체에 저장한다.
standalone_admin_password.txt
파일이 없는 경우에는 password 객체에 None을 저장한다.
아마 새로운 패스워드 파일을 생성해주지 않은 이유는 admin 계정이라 함부로 패스워드를 재발급하지 않으려는 의도인 것 같다.
c. scheduler, webserver, triggerer
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L68-L85
# Set up commands to run
self.subcommands["scheduler"] = SubCommand(
self,
name="scheduler",
command=["scheduler"],
env=env,
)
self.subcommands["webserver"] = SubCommand(
self,
name="webserver",
command=["webserver"],
env=env,
)
self.subcommands["triggerer"] = SubCommand(
self,
name="triggerer",
command=["triggerer"],
env=env,
)
실행할 명령어를 subcommands 딕셔너리 객체에 할당하고 starts()를 통해 스레드를 실행시킨다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L271-L300
class SubCommand(threading.Thread):
"""
Execute a subcommand on another thread.
Thread that launches a process and then streams its output back to the main
command. We use threads to avoid using select() and raw filehandles, and the
complex logic that brings doing line buffering.
"""
def __init__(self, parent, name: str, command: list[str], env: dict[str, str]):
super().__init__()
self.parent = parent
self.name = name
self.command = command
self.env = env
def run(self):
"""Runs the actual process and captures it output to a queue."""
self.process = subprocess.Popen(
["airflow"] + self.command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=self.env,
)
for line in self.process.stdout:
self.parent.output_queue.append((self.name, line))
def stop(self):
"""Call to stop this process (and thus this thread)."""
self.process.terminate()
SubCommand
클래스로 이동해보면 Thread
를 상속받은 것을 알 수 있다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L89-L90
# Run subcommand threads
for command in self.subcommands.values():
command.start()
각 component들을 스레드로 실행한다.
# airflow.cli.commands.standalone_command.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/cli/commands/standalone_command.py#L93-L111
while True:
try:
# Print all the current lines onto the screen
self.update_output()
# Print info banner when all components are ready and the
# delay has passed
if not self.ready_time and self.is_ready():
self.ready_time = time.monotonic()
if (
not shown_ready
and self.ready_time
and time.monotonic() - self.ready_time > self.ready_delay
):
self.print_ready()
shown_ready = True
# Ensure we idle-sleep rather than fast-looping
time.sleep(0.1)
except KeyboardInterrupt:
break
키보드 인터럽트가 발생할 때까지 무한 루프에 들어가는데, 스레드로 돌아가는 component들의 output을 주기적으로 업데이트해준다.
python에서의 thread와 process는 한번 제대로 공부해봐야겠다. coroutine, asyncio 등에 대해서도 알고만 있었지 잘 몰랐었는데, 공부해봐야겠다.
Ghost