[Airflow 파먹기] main - configuration

0

0

1. configuration

A. configuration.conf

def main():
    """Main executable function."""
    conf = configuration.conf

환경설정과 관련된 내용이 들어있을 것 같다.

Airflow는 airflow.cfg 파일을 통해 환경변수 값을 설정하니 맨 처음에 환경변수 관련된 설정을 해주는 것 같다. MWAA는 속을 파볼 수 없지만, 이 부분이 아래 사진에서 입력한 값을 읽어오도록 되어 있을 것 같다. 아니면 MWAA 환경 설정 중 Airflow 구성 옵션(아래 사진)에 값을 입력하면 airflow.cfg 파일의 값이 갱신되는 구조이고 conf = configuration.conf 코드는 그대로일 수도 있겠다.

Airflow.cfg에 관련된 내용은 Airflow.cfg를 참고하면 된다.

Airflow.cfg에서 설정하는 환경변수 값은 Configuration을 참고하면 된다.

코드가 길게 짜여져 있겠지만, 결국 파일을 airflow.cfg 파일을 load해서 값을 읽는 구조일 것이다.

그러면, 환경변수별로 key와 value를 어떻게 구분하는지, Airflow source code에서는 airflow.cfg 파일을 어떻게 읽는지, 수 많은 환경변수를 어떤 방식으로 동적으로 체크하는지 궁금해지기 시작한다.

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1816-L1818

conf = initialize_config()
secrets_backend_list = initialize_secrets_backends()
conf.validate()

B. initialize_config()

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1480-L1490

conf = initialize_config()

def initialize_config() -> AirflowConfigParser:
    """
    Load the Airflow config files.

    Called for you automatically as part of the Airflow boot process.
    """
    global FERNET_KEY, AIRFLOW_HOME

    default_config = _parameterized_config_from_template("default_airflow.cfg")

    local_conf = AirflowConfigParser(default_config=default_config)
        ...

conf 객체는 initialize_config()로 생성된다. 객체 타입은 AirflowConfigParse로 보인다.

default_config 객체를 생성할 때 default_airflow.cfg파일에서 값을 읽어와서 AirflowConfigParser 객체를 생성해주는 것 같다. (여기에서 Airflow source code에서는 airflow.cfg 파일을 어떻게 읽는지에 대한 궁금증을 해소할 수 있을 것 같다.)

_parameterized_config_from_template()로 들어가보자.

C. default_airflow.cfg

...

# Users should not modify this file; they should customize the generated
# airflow.cfg instead.


# ----------------------- TEMPLATE BEGINS HERE -----------------------

[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = {AIRFLOW_HOME}/dags

...

_parameterized_config_from_template()로 넘어가기 전에 default_airflow.cfg가 어떤 구조인지 확인해보자.

# ~~ 는 주석으로 표현된 것 같은데, 이게 _parameterized_config_from_template()에서 어떻게 값이 빠질지 궁금하다.

[core] 는 환경변수의 category같은 느낌인데 이건 어떻게 사용될지 궁금하다.

{AIRFLOW_HOME} python의 환경변수를 가져오는 부분 같다. { }로 표현한 것은 나중에 .format(**dict)으로 { }안에 key에 매칭되는 value를 넣어주기 위함으로 보인다.

D. _parameterized_config_from_template()

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1444-L1453

def _parameterized_config_from_template(filename) -> str:
    TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n"

    path = _default_config_file_path(filename)
    with open(path) as fh:
        for line in fh:
            if line != TEMPLATE_START:
                continue
            return parameterized_config(fh.read().strip())
    raise RuntimeError(f"Template marker not found in {path!r}")

default_airflow.cfg을 읽기 위해 path를 구하고 파일의 값 중에 -- TEMPLATE BEGINS HERE -- 부터 읽어서 str 데이터 타입을 return하는 방식이다.

에러를 발생시키는 부분도 조금 특이하다. default_airflow.cfg 파일의 값이 잘못되었다면 RuntimeError 를 발생시킨다. runtime 중에만 알 수 있는 에러라서 ValueError가 아닌 RuntimeError를 쓴 것일까?

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1456-L1463

def parameterized_config(template) -> str:
    """
    Generates configuration from provided template & variables defined in current scope.

    :param template: a config content templated with {{variables}}
    """
    all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
    return template.format(**all_vars)

[globals(), locals()]는 처음봤다. 설명은 여기를 참고하면 된다.

.format(**dict) format 함수에 dictionary unpacking을 진행해서 default_airflow.cfg 값 중에서 { }를 채워준다. Unpack a dictionary to format 을 참고하면 동작이 이해될 것 이다.

default_airflow.cfg 값이 str 데이터 타입으로 default_config객체에 할당된다. 그리고 AirflowConfigParser 클래스의 생성자 argument로 전달된다.

E. AirflowConfigParser

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L164-L171

class AirflowConfigParser(ConfigParser):
    """Custom Airflow Configparser supporting defaults and deprecated options."""

    # These configuration elements can be fetched as the stdout of commands
    # following the "{section}__{name}_cmd" pattern, the idea behind this
    # is to not store password on boxes in text files.
    # These configs can also be fetched from Secrets backend
    # following the "{section}__{name}__secret" pattern

    ...

custom airflow configparser.

default_airflow.cfg를 parsing하는 클래스로 보인다. 이 클래스에서 C. default_airflow.cfg에서 궁금해하던 부분을 해소할 수 있을 것으로 보인다.

특이점으로 보통 클래스는 클래스변수 다음에 생성자를 작성해둘텐데, AirflowConfigParser 클래스는 조금 다르다. 아래 사진처럼 생성자 메서드보다 위에 작성된 메서드가 존재한다. 이것도 파이써닉인가?

어쨋든, AirflowConfigParser 객체를 생성하고 바로 호출한 메서드가 아래 getboolean이다. 메서드 이름에서 파이써닉한 느낌이 전혀 들지 않는다. 보통 파이썬은 snake case로 작성할텐데... 흠..

local_conf.getboolean("core", "unit_test_mode")
a. getboolean()

getboolean을 타고 넘어가면 아래 코드가 나온다.

아래 코드를 보고 알 수 있는 것은 default_airflow.cfg에서 대괄호([ ])에 적힌 내용을 section이라고 부르고 세부 환경 변수를 key라고 부르는 것 같다.

그리고 getboolean, getint, getfloat ... 으로 환경변수 값을 가져오는 것으로 보인다.

# airflow.configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L765C10-L777

def getboolean(self, section: str, key: str, **kwargs) -> bool:  # type: ignore[override]
        val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
        if "#" in val:
                val = val.split("#")[0].strip()
        if val in ("t", "true", "1"):
                return True
        elif val in ("f", "false", "0"):
                return False
        else:
                raise AirflowConfigException(
                        f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
                        f'Current value: "{val}".'
                )

좀 더 깊이 들어가보자. self.get()으로 들어가보면, 코드는 아래와 같다.

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L566-L587

@overload  # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str:  # type: ignore[override]
        ...

@overload  # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> str | None:  # type: ignore[override]
        ...

def get(  # type: ignore[override, misc]
        self,
        section: str,
        key: str,
        _extra_stacklevel: int = 0,
        **kwargs,
) -> str | None:
        section = str(section).lower()
        key = str(key).lower()
        warning_emitted = False
        deprecated_section: str | None
        deprecated_key: str | None

...

ㅋㅋ 뭔가 이상하다. 파이썬은 오버로딩을 지원하지 않는다.는 지식을 가지고 있던 내 상식이 잘못된건가 싶었다. 이에 대해서는 python type hints how to use overload에 자세히 설명되어 있다.

꼼수같지만, 파이썬에서 공식적으로 지원하는 방식 같다. 아래 사진처럼 자동 완성을 의도한 것인지는 모르겠으나, 이런 방식으로 사용하는 것을 권장하려고 저런 방식으로 작성한 것 같다.

# airflow.configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L587-L613

# For when we rename whole sections
if section in self.inversed_deprecated_sections:
        deprecated_section, deprecated_key = (section, key)
        section = self.inversed_deprecated_sections[section]
        if not self._suppress_future_warnings:
                warnings.warn(
                        f"The config section [{deprecated_section}] has been renamed to "
                        f"[{section}]. Please update your `conf.get*` call to use the new name",
                        FutureWarning,
                        stacklevel=2 + _extra_stacklevel,
                )
        # Don't warn about individual rename if the whole section is renamed
        warning_emitted = True
elif (section, key) in self.inversed_deprecated_options:
        # Handle using deprecated section/key instead of the new section/key
        new_section, new_key = self.inversed_deprecated_options[(section, key)]
        if not self._suppress_future_warnings and not warning_emitted:
                warnings.warn(
                        f"section/key [{section}/{key}] has been deprecated, you should use"
                        f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
                        "new name",
                        FutureWarning,
                        stacklevel=2 + _extra_stacklevel,
                )
                warning_emitted = True
        deprecated_section, deprecated_key = section, key
        section, key = (new_section, new_key)

default_airflow.cfg에서 deprecated된 section이나 option이 있을 경우를 체크한다. deprecateded된 section, option이 있다면, 새로운 section, option으로 변경해준다.

# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L624-L670

# first check environment variables
option = self._get_environment_variables(
        deprecated_key,
        deprecated_section,
        key,
        section,
        issue_warning=not warning_emitted,
        extra_stacklevel=_extra_stacklevel,
)
if option is not None:
        return option

# ...then the config file
option = self._get_option_from_config_file(
        deprecated_key,
        deprecated_section,
        key,
        kwargs,
        section,
        issue_warning=not warning_emitted,
        extra_stacklevel=_extra_stacklevel,
)
if option is not None:
        return option

# ...then commands
option = self._get_option_from_commands(
        deprecated_key,
        deprecated_section,
        key,
        section,
        issue_warning=not warning_emitted,
        extra_stacklevel=_extra_stacklevel,
)
if option is not None:
        return option

# ...then from secret backends
option = self._get_option_from_secrets(
        deprecated_key,
        deprecated_section,
        key,
        section,
        issue_warning=not warning_emitted,
        extra_stacklevel=_extra_stacklevel,
)
if option is not None:
        return option

차례대로 environment variable, config file, command, secrets에 key와 section이 존재하는지 체크하면서 option을 가져온다.

단순한 if문의 반복으로 보인다. 아래 처럼 좀 더 깔끔하게 개선해볼 수 있을 것 같은데, 한번 issue로 등록해볼까?

funcs = [
        self._get_environment_variables,
        self._get_option_from_config_file,
        self._get_option_from_commands,
        self._get_option_from_secrets,
]

for func in funcs:
        option = func(
                deprecated_key,
                deprecated_section,
                key,
                section,
                issue_warning=not warning_emitted,
                extra_stacklevel=_extra_stacklevel,
        )

        if option is not None:
                return option

엄청 깊게 들어가지는 말아야겠다.결국 AirflowConfigParser 클래스에 default_airflow.cfg을 집어넣고 정규표현식으로 파싱한 다음 적절히 section과 key를 받아서 값을 리턴하는 방식이다.

C. secrets_backend_list = initialize_secrets_backends()

A. configuration.conf로 다시 넘어가보자.

여기서는 secret value를 읽어오는 방식으로 보인다.

# airflow.configuration.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/configuration.py#L1730-L1748

def initialize_secrets_backends() -> list[BaseSecretsBackend]:
    """
    Initialize secrets backend.

    * import secrets backend classes
    * instantiate them and return them in a list
    """
    backend_list = []

    custom_secret_backend = get_custom_secret_backend()

    if custom_secret_backend is not None:
        backend_list.append(custom_secret_backend)

    for class_name in DEFAULT_SECRETS_SEARCH_PATH:
        secrets_backend_cls = import_string(class_name)
        backend_list.append(secrets_backend_cls())

    return backend_list

secret만 별도로 뺀 이유는 뭘까? 이전에 default_airflow.cfg를 읽어올 때 한꺼번에 읽어서 처리하면 되지 않을까? 아니면 별도로 저장해야하는 이유가 있는걸까?

별도로 사용하는 부분을 찾지 못했다.

흠.

이 글이 도움이 되었나요?

신고하기
0분 전
작성된 댓글이 없습니다. 첫 댓글을 달아보세요!
    댓글을 작성하려면 로그인이 필요합니다.