# [Airflow 파먹기] main - configuration

- Author: @mildsalmon
- Published: 2023-05-14
- Updated: 2023-05-20
- Source: http://blex.me/@mildsalmon/airflow-%ED%8C%8C%EB%A8%B9%EA%B8%B0-main-configuration
- Tags: airflow파먹기

---

# 1. configuration

### A. configuration.conf

```python
def main():
    """Main executable function."""
    conf = configuration.conf
```

환경설정과 관련된 내용이 들어있을 것 같다.

Airflow는 airflow.cfg 파일을 통해 환경변수 값을 설정하니 맨 처음에 환경변수 관련된 설정을 해주는 것 같다. MWAA는 속을 파볼 수 없지만, 이 부분이 아래 사진에서 입력한 값을 읽어오도록 되어 있을 것 같다. 아니면 MWAA 환경 설정 중 Airflow 구성 옵션(아래 사진)에 값을 입력하면 airflow.cfg 파일의 값이 갱신되는 구조이고 `conf = configuration.conf` 코드는 그대로일 수도 있겠다.

![](https://static.blex.me/images/content/2023/5/14/20235149_po9E2LxZnLYlqpgAkZUO.jpg)

Airflow.cfg에 관련된 내용은 [Airflow.cfg](https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html)를 참고하면 된다.

Airflow.cfg에서 설정하는 환경변수 값은 [Configuration](https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html)을 참고하면 된다.

코드가 길게 짜여져 있겠지만, 결국 파일을 airflow.cfg 파일을 load해서 값을 읽는 구조일 것이다.

그러면, `환경변수별로 key와 value를 어떻게 구분하는지`, `Airflow source code에서는 airflow.cfg 파일을 어떻게 읽는지`, `수 많은 환경변수를 어떤 방식으로 동적으로 체크하는지` 궁금해지기 시작한다.

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1816-L1818

conf = initialize_config()
secrets_backend_list = initialize_secrets_backends()
conf.validate()
```

### B. initialize_config()

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1480-L1490

conf = initialize_config()

def initialize_config() -> AirflowConfigParser:
    """
    Load the Airflow config files.

    Called for you automatically as part of the Airflow boot process.
    """
    global FERNET_KEY, AIRFLOW_HOME

    default_config = _parameterized_config_from_template("default_airflow.cfg")

    local_conf = AirflowConfigParser(default_config=default_config)
		...
```

conf 객체는 `initialize_config()`로 생성된다. 객체 타입은 `AirflowConfigParse`로 보인다.

default_config 객체를 생성할 때 `default_airflow.cfg`파일에서 값을 읽어와서 AirflowConfigParser 객체를 생성해주는 것 같다. (여기에서 `Airflow source code에서는 airflow.cfg 파일을 어떻게 읽는지`에 대한 궁금증을 해소할 수 있을 것 같다.)

`_parameterized_config_from_template()`로 들어가보자.

### C. default_airflow.cfg

```
...

# Users should not modify this file; they should customize the generated
# airflow.cfg instead.


# ----------------------- TEMPLATE BEGINS HERE -----------------------

[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = {AIRFLOW_HOME}/dags

...
```

`_parameterized_config_from_template()`로 넘어가기 전에 `default_airflow.cfg`가 어떤 구조인지 확인해보자.

`# ~~` 는 주석으로 표현된 것 같은데, 이게 `_parameterized_config_from_template()`에서 어떻게 값이 빠질지 궁금하다.

`[core]` 는 환경변수의 category같은 느낌인데 이건 어떻게 사용될지 궁금하다.

`{AIRFLOW_HOME}` python의 환경변수를 가져오는 부분 같다. `{ }`로 표현한 것은 나중에 `.format(**dict)`으로 `{ }`안에 key에 매칭되는 value를 넣어주기 위함으로 보인다.

### D. _parameterized_config_from_template()

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1444-L1453

def _parameterized_config_from_template(filename) -> str:
    TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n"

    path = _default_config_file_path(filename)
    with open(path) as fh:
        for line in fh:
            if line != TEMPLATE_START:
                continue
            return parameterized_config(fh.read().strip())
    raise RuntimeError(f"Template marker not found in {path!r}")
```

`default_airflow.cfg`을 읽기 위해 path를 구하고 파일의 값 중에 `-- TEMPLATE BEGINS HERE --` 부터 읽어서 str 데이터 타입을 return하는 방식이다.

에러를 발생시키는 부분도 조금 특이하다. `default_airflow.cfg` 파일의 값이 잘못되었다면 `RuntimeError` 를 발생시킨다. runtime 중에만 알 수 있는 에러라서 `ValueError`가 아닌 `RuntimeError`를 쓴 것일까?

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1456-L1463

def parameterized_config(template) -> str:
    """
    Generates configuration from provided template & variables defined in current scope.

    :param template: a config content templated with {{variables}}
    """
    all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
    return template.format(**all_vars)
```

`[globals(), locals()]`는 처음봤다. 설명은 [여기](https://congcoding.tistory.com/55)를 참고하면 된다.

`.format(**dict)` format 함수에 dictionary unpacking을 진행해서 `default_airflow.cfg` 값 중에서 `{ }`를 채워준다. [Unpack a dictionary to format](https://stackoverflow.com/questions/46842099/unpack-a-dictionary-to-format) 을 참고하면 동작이 이해될 것 이다.

`default_airflow.cfg` 값이 str 데이터 타입으로 `default_config`객체에 할당된다. 그리고 `AirflowConfigParser` 클래스의 생성자 argument로 전달된다.

### E. AirflowConfigParser

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L164-L171

class AirflowConfigParser(ConfigParser):
    """Custom Airflow Configparser supporting defaults and deprecated options."""

    # These configuration elements can be fetched as the stdout of commands
    # following the "{section}__{name}_cmd" pattern, the idea behind this
    # is to not store password on boxes in text files.
    # These configs can also be fetched from Secrets backend
    # following the "{section}__{name}__secret" pattern

	...
```

custom airflow configparser.

`default_airflow.cfg`를 parsing하는 클래스로 보인다. 이 클래스에서 `C. default_airflow.cfg`에서 궁금해하던 부분을 해소할 수 있을 것으로 보인다.

특이점으로 보통 클래스는 클래스변수 다음에 생성자를 작성해둘텐데, AirflowConfigParser 클래스는 조금 다르다. 아래 사진처럼 생성자 메서드보다 위에 작성된 메서드가 존재한다. ~~이것도 파이써닉인가?~~

![](https://static.blex.me/images/content/2023/5/16/202351618_2Gapso1JIPRdGpt1Awlq.jpg)

어쨋든, AirflowConfigParser 객체를 생성하고 바로 호출한 메서드가 아래 `getboolean`이다. 메서드 이름에서 파이써닉한 느낌이 전혀 들지 않는다. 보통 파이썬은 snake case로 작성할텐데... 흠..

```python
local_conf.getboolean("core", "unit_test_mode")
```

##### a. getboolean()

getboolean을 타고 넘어가면 아래 코드가 나온다.

아래 코드를 보고 알 수 있는 것은 `default_airflow.cfg`에서 대괄호(\[ \])에 적힌 내용을 section이라고 부르고 세부 환경 변수를 key라고 부르는 것 같다.

그리고 `getboolean`, `getint`, `getfloat` ... 으로 환경변수 값을 가져오는 것으로 보인다.

```python
# airflow.configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L765C10-L777

def getboolean(self, section: str, key: str, **kwargs) -> bool:  # type: ignore[override]
		val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
		if "#" in val:
				val = val.split("#")[0].strip()
		if val in ("t", "true", "1"):
				return True
		elif val in ("f", "false", "0"):
				return False
		else:
				raise AirflowConfigException(
						f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
						f'Current value: "{val}".'
				)
```

좀 더 깊이 들어가보자. `self.get()`으로 들어가보면, 코드는 아래와 같다. 

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L566-L587

@overload  # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str:  # type: ignore[override]
		...

@overload  # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> str | None:  # type: ignore[override]
		...

def get(  # type: ignore[override, misc]
		self,
		section: str,
		key: str,
		_extra_stacklevel: int = 0,
		**kwargs,
) -> str | None:
		section = str(section).lower()
		key = str(key).lower()
		warning_emitted = False
		deprecated_section: str | None
		deprecated_key: str | None

...
```

ㅋㅋ 뭔가 이상하다. `파이썬은 오버로딩을 지원하지 않는다.`는 지식을 가지고 있던 내 상식이 잘못된건가 싶었다. 이에 대해서는 [python type hints how to use overload](https://adamj.eu/tech/2021/05/29/python-type-hints-how-to-use-overload/)에 자세히 설명되어 있다.

꼼수같지만, 파이썬에서 공식적으로 지원하는 방식 같다. 아래 사진처럼 자동 완성을 의도한 것인지는 모르겠으나, 이런 방식으로 사용하는 것을 권장하려고 저런 방식으로 작성한 것 같다.

![](https://static.blex.me/images/content/2023/5/16/202351618_kZU9GiKsgr4vkjbIQ3XD.jpg)

```python
# airflow.configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L587-L613

# For when we rename whole sections
if section in self.inversed_deprecated_sections:
		deprecated_section, deprecated_key = (section, key)
		section = self.inversed_deprecated_sections[section]
		if not self._suppress_future_warnings:
				warnings.warn(
						f"The config section [{deprecated_section}] has been renamed to "
						f"[{section}]. Please update your `conf.get*` call to use the new name",
						FutureWarning,
						stacklevel=2 + _extra_stacklevel,
				)
		# Don't warn about individual rename if the whole section is renamed
		warning_emitted = True
elif (section, key) in self.inversed_deprecated_options:
		# Handle using deprecated section/key instead of the new section/key
		new_section, new_key = self.inversed_deprecated_options[(section, key)]
		if not self._suppress_future_warnings and not warning_emitted:
				warnings.warn(
						f"section/key [{section}/{key}] has been deprecated, you should use"
						f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
						"new name",
						FutureWarning,
						stacklevel=2 + _extra_stacklevel,
				)
				warning_emitted = True
		deprecated_section, deprecated_key = section, key
		section, key = (new_section, new_key)
```

`default_airflow.cfg`에서 deprecated된 section이나 option이 있을 경우를 체크한다. deprecateded된 section, option이 있다면, 새로운 section, option으로 변경해준다.

```python
# airflow/configuration.py
# https://github.com/apache/airflow/blob/main/airflow/configuration.py#L624-L670

# first check environment variables
option = self._get_environment_variables(
		deprecated_key,
		deprecated_section,
		key,
		section,
		issue_warning=not warning_emitted,
		extra_stacklevel=_extra_stacklevel,
)
if option is not None:
		return option

# ...then the config file
option = self._get_option_from_config_file(
		deprecated_key,
		deprecated_section,
		key,
		kwargs,
		section,
		issue_warning=not warning_emitted,
		extra_stacklevel=_extra_stacklevel,
)
if option is not None:
		return option

# ...then commands
option = self._get_option_from_commands(
		deprecated_key,
		deprecated_section,
		key,
		section,
		issue_warning=not warning_emitted,
		extra_stacklevel=_extra_stacklevel,
)
if option is not None:
		return option

# ...then from secret backends
option = self._get_option_from_secrets(
		deprecated_key,
		deprecated_section,
		key,
		section,
		issue_warning=not warning_emitted,
		extra_stacklevel=_extra_stacklevel,
)
if option is not None:
		return option
```
	
차례대로 environment variable, config file, command, secrets에 key와 section이 존재하는지 체크하면서 option을 가져온다.

단순한 if문의 반복으로 보인다. 아래 처럼 좀 더 깔끔하게 개선해볼 수 있을 것 같은데, 한번 issue로 등록해볼까?

```python
funcs = [
		self._get_environment_variables,
		self._get_option_from_config_file,
		self._get_option_from_commands,
		self._get_option_from_secrets,
]

for func in funcs:
		option = func(
				deprecated_key,
				deprecated_section,
				key,
				section,
				issue_warning=not warning_emitted,
				extra_stacklevel=_extra_stacklevel,
		)

		if option is not None:
				return option
```

엄청 깊게 들어가지는 말아야겠다.결국 `AirflowConfigParser` 클래스에 `default_airflow.cfg`을 집어넣고 정규표현식으로 파싱한 다음 적절히 section과 key를 받아서 값을 리턴하는 방식이다.

### C. secrets_backend_list = initialize_secrets_backends()

A. configuration.conf로 다시 넘어가보자.

여기서는 secret value를 읽어오는 방식으로 보인다.

```python
# airflow.configuration.py
# https://github.com/apache/airflow/blob/v2-6-stable/airflow/configuration.py#L1730-L1748

def initialize_secrets_backends() -> list[BaseSecretsBackend]:
    """
    Initialize secrets backend.

    * import secrets backend classes
    * instantiate them and return them in a list
    """
    backend_list = []

    custom_secret_backend = get_custom_secret_backend()

    if custom_secret_backend is not None:
        backend_list.append(custom_secret_backend)

    for class_name in DEFAULT_SECRETS_SEARCH_PATH:
        secrets_backend_cls = import_string(class_name)
        backend_list.append(secrets_backend_cls())

    return backend_list
```
	
secret만 별도로 뺀 이유는 뭘까? 이전에 default_airflow.cfg를 읽어올 때 한꺼번에 읽어서 처리하면 되지 않을까? 아니면 별도로 저장해야하는 이유가 있는걸까?

별도로 사용하는 부분을 찾지 못했다.

흠.
