[Airflow 개선 - DAG Factory] 1. 왜 하는가?

0. 요약

작성하던 DAG script code에서 중복 코드 문제가 발생하였다. 이 문제를 Design Patterns을 활용하여 개선하려고 한다.

1. 왜, DAG Factory를 구현하려고 해?

팀이 빠르게 데이터 파이프라인을 구축해서 한시라도 빨리 데이터를 적재해야 했기 때문에 사소한 부분은 무시하고 넘어갔었다. 그렇지만 이런 사소한 작은 부분들이 모여서 크고 복잡한 문제로 변신하기 때문에 전체적인 코드를 되돌아보려고 한다. 겸사겸사 최근에 공부했던 OOP와 Gangs of Four (GoF) Design Patterns중에 Creational Patterns의 Factory Method 패턴, Builder 패턴을 사용하여 코드를 개선하려고 한다.

우리는 팀 내의 공통 컨벤션을 맞추기 위해 Pylint(정적 분석 도구)와 Black(코드 포매터)을 사용한다. 그리고 매번 코드를 gitlab에 push할 때마다 Pylint와 Black 명령을 입력하기 번거로워서 pre-commit-hook을 달아두었다. 이때, dag script code를 새로 생성할때마다 높은 확률로 pylint는 duplicate-code를 밷었다. 이전까지는 # pylint:disable=duplicate-code 으로 pylint를 무력화하여 개발을 진행했다. 하지만 이번 기회에 위 문단에서 말한 작은 문제(duplicate-code )를 해결하려고 한다.

A. 불필요한 반복 코드

duplicate-code는 여러개의 DAG code가 DAG class와 Task (ABCOperator, ABCSensor ..) Class의 객체를 반복해서 사용하기 때문에 발생한다. 좀 더 구체적으로는 객체를 반복해서 생성하기 때문에 발생하기 보다는, class 객체를 생성할 때 전달하는 argument도 완전히 동일하기 때문에 발생하는 문제다.

기존에 DAG 코드를 작성할때는 매번 DAG class에 argument를 입력하는 것이 귀찮아서 기존에 작성한 DAG 코드를 복사해서 사용한다. DAG class의 argument가 매번 동일해도 큰 상관이 없어서 이렇게 사용했다. 그런데 이 편리한 방식이 duplicate-code를 밷는 원인이었다. 코드를 편리하게 작성하려던 꼼수로 인해 심적으로 불편한 상황인 된 것이다.

아래는 우리가 사용하는 코드를 추상화시킨 코드이다.

from airflow.models import DAG

from module.airflow.task import ABCOperator, ABCSensor

# pylint:disable=duplicate-code
with DAG(
    dag_id=os.path.basename(__file__).replace(".py", ""),
        schedule=None,
        start_date=pendulum.datetime(2022, 12, 12, tz=pendulum.timezone("Asia/Seoul")),
) as dag:
    abc_operator = ABCOperator(
            task_id='abc_operator',
                ..
    )

        abc_sensor = ABCSensor(
            task_id='abc_sensor'
                ...
        )

그리고 DAG script code를 전부 도식화해보면 아래 사진과 같다.

추상화된 DAG script code의 도식화
사진 1

2. 발생할 수 있는 더 큰 문제는 어떤 것이 있을까?

A. 변경 포인트가 분산된다. (동일한 argument 추가의 반복)

위 사진을 보면 모든 DAG script code가 DAG 객체를 생성하여 사용한다.

만약 DAG 객체의 argument 중에 render_template_as_native_obj=True를 추가해야하는 경우에는 어떻게 코드를 수정할 수 있을까?

from airflow.models import DAG

# pylint:disable=duplicate-code
with DAG(
    dag_id=os.path.basename(__file__).replace(".py", ""),
        schedule=None,
        start_date=pendulum.datetime(2022, 12, 12, tz=pendulum.timezone("Asia/Seoul")),
      render_template_as_native_obj=True,
) as dag:
    ...

아주 단순하게 생각하면 위 코드 블록처럼 모든 DAG script code를 수정하면 된다. 여기서 문제는 위 사진에서만 9개의 DAG script code를 수정하는 반복작업이 필요하다. 만약 DAG script code가 1000개라면, 동일한 작업을 1000번 반복해야 한다. 비효율적이고 끔찍한 반복작업이다.

B. 변경 포인트가 분산된다. (DAG class 구현에 의존)

만약에 Airflow 99.0.7 version이 출시되어서 DAG class가 deprecated되고 동일한 일을 수행하는 DalaG class가 생겼다고 가정해보자. 그러면 모든 DAG script code의 DAG 객체를 생성하는 부분을 아래와 같이 변경해야 한다.

from airflow.models import DalaG

# pylint:disable=duplicate-code
with DalaG(
    dag_id=os.path.basename(__file__).replace(".py", ""),
        schedule=None,
        start_date=pendulum.datetime(2022, 12, 12, tz=pendulum.timezone("Asia/Seoul")),
      render_template_as_native_obj=True,
) as dag:
    ...

DAG script code가 1000개 있다면, 1000개 파일을 전부 수정해야 한다. 엄청 비효율적이다.

이런 것을 DAG script code들은 DAG 클래스 구현에 의존적이다 라고도 말할 수 있다.

3. 어떻게 개선하려고 해?

이와 비슷한 문제들은 아주 예전부터 있었고 이를 해결하기 위해 GoF Design Patterns에서는 여러 솔루션을 제공한다. DAG Factory는 Creational Patterns 중의 Factory Method, Task Factory에서는 Abstract Factory Patterns을 사용하여 해결하고자 한다.

엄청나게 대단한 기술을 사용하는 것처럼 보일 수 있는데, 막상 코드를 보면 그렇게 대단한 것은 아니다.

A. Simple Factory?

Factory는 객체 생성을 처리하는 클래스를 의미한다. 아래 코드처럼 생성 부분을 전담할 클래스를 만들 수 있는데, 아래와 같은 경우는 Simple Factory라고 한다.

class PizzaStore:
    def __init__(factory: SimplePizzaFactory):
        self.factory = factory
        
    def order_pizza(type: str):
        pizza: Pizza = self.factory.create_pizza(type)
        
        pizza.prepare()
        pizza.bake()
        pizza.cut()
        
        return pizza
class SimaplePizzaFactory:
    def create_pizza(type: str) -> Pizza:
        if (type == 'cheese'):
            pizza: Pizza = CheesePizza()
            ...
        retrun pizza

B. Factory Method ?

위 코드가 Factory Method가 아닌 Simple Factory인 이유는 추상 메소드로 선언해야할 create 메소드를 직접 구현하고 있기 때문이다. Simple Factory는 factory가 무엇인지 PizzaStore가 알아야한다는 단점이 있다.

Factory Method는 create 메소드를 추상 메소드로 선언하여 자식 클래스(서브 클래스)에서 결정하도록 한다. PizzaStore를 사용하는 코드에서는 어떤 PizzaFactory를 사용할지 고민하지 않아도 된다. PizzaStore에게 order_pizza 메소드를 요청하면 PizzaStore는 알아서 서브 클래스를 통해 Pizza를 만들고 pizza를 prepare, bake, cut해준다. (구체적으로 말하면, PizzaStore는 추상 클래스이고, PizzaStore를 호출하는 코드는 NYPizzaStore를 호출할 것이다. PizzaStore 클래스에서는 호출자가 NYPizzaStore, ChicagoPizzaStore 중 어떤 것을 선택했는지 알 필요가 없다.)

이를 통해 PizzaStore는 인터페이스(implement하는 것 말고 개념적으로)가 된다.

인터페이스를 통해 의존성 뒤집기(Dependency Inversion Principle)도 가능하게 된다.

Factory Method는 상속으로 객체를 만든다.

from abc import ABCMeta

class PizzaStore (metaclass=ABCMeta):
    def order_pizza(type: str) -> Pizza:
        pizza: Pizza = create_pizza(type)
        
        pizza.prepare()
        pizza.bake()
        pizza.cut()
        
        return pizza
    
    @abstractmethod
    def create_pizza(type: str):
        pass
class NYPizzaStore(PizzaStore):
    def create_pizza(type: str):
        if (type == 'cheese'):
            return CheesePizza()
            ...
        return None

C. Factory 패턴을 어떻게 활용하려고 해?

내가 처음 DAGFactory를 만들때 이게 Factory 패턴이 맞나?라는 의문을 가지고 코드를 작성했다. 이 의문에 대해 테크리더님께 질문드리니 이런 답변을 주셨다.

디자인 패턴의 규칙을 완벽하게 지키면서 코드를 개발할 수는 없다. 우리가 직면하고 있는 문제를 디자인 패턴 관점의 도움을 받아서 해결하려는 것이다.

모든 코드에 인터페이스를 만들면 좋겠지만, 굳이 그럴 필요 없는 부분의 복잡도를 높일 필요가 있을까?

DAGFactory에서는 SimpleFactory로 아주 간단하게 코드 중복이라는 문제를 해결할 것이다. 덤으로 관리 포인트를 한 곳으로 모으는 효과도 확인할 수 있다.

TaskFactory에서는 Factory Method 패턴을 사용하려고 한다. 이 부분은 아직 구상단계지만, Task(Operator, Sensor)가 다양해서 인터페이스를 하나 만들고 비슷한 궤를 하는 Task를 서브 클래스로 만들면 될 것이라는 생각이 든다.

D. Builder

빌더 패턴은 복합 객체 생성 과정을 캡슐화한다. 여러 단계와 다양한 절차를 거쳐 객체를 만들 수 있다. 추상 인터페이스 클래스를 사용하여 내부 구현을 클라이언트로부터 보호할 수 있다.

파이썬을 사용한다면 굳이 builder 패턴을 사용할 필요 없이 생성자에 다 집어넣으면 되는 것 아닌가? 라고 생각할 수도 있다.

  1. pylint에서 too-many-argument에러가 발생한다.
  2. 객체 생성에 공통적으로 반드시 필요하지 않은 인자도 생성자에 포함되기 때문에 생성자가 너무 거대해진다.

위 이유를 생각해보니, 빌더 패턴으로 푸는 것이 더 좋은 방법이라고 생각했다.

이 글이 도움이 되었나요?

신고하기
0분 전
작성된 댓글이 없습니다. 첫 댓글을 달아보세요!
    댓글을 작성하려면 로그인이 필요합니다.