# PySpark Coding Convention 저는 Apache Spark를 21년 12월경에 처음 접하였으며, 최근들어 하루 중 코딩하는 시간의 절반은 PySpark로 코드를 작성하고 있습니다. 이 글의 목적은 모두가 더 나은 PySpark 코드를 작성하였으면 하는 바램입니다. 따라서, 제가 추구하는 PySpark 작성 규칙들은 무엇인지, 또 어떤 생각으로 이렇게 정하였는지에 대해 설명드리려 합니다. (코드는 자고로 깔끔하고, 규칙적이며, 명시적으로 작성되어야 한다고 생각합니다. 많은 이유가 있겠지만, 가장 중요하게 생각하는 것은 운영 중 코드 레벨에서 문제가 발생했을 때, 명확한 규칙이 있다면 매우 간단히, 그리고 실수 없이 코드를 수정할 수 있기 때문입니다. —추가로, 프로그래머는 다양한 본인만의 스타일이 있습니다. 저는 모든 스타일을 존중합니다.) 우선, 이 글을 쓰게 된 것에 영감을 준 Chirs Moradi에게 감사를 표하며, 그의 글인 [Focus on Maintainability and Break Those ETL Tasks Up](https://medium.com/97-things/focus-on-maintainability-and-break-those-etl-tasks-up-f39aa988a8de)을 공유합니다. 우선, 가장 많이 사용하는 메서드들을 활용한 Read - Transformation - Write 작업에 제 규칙을 부여한 코드를 보여드리겠습니다. ```python raw_df = spark.read.format('parquet').load('s3://bucket_name/table_name') transformed_df = (raw_df .filter( (col('date') >= '20250111') & ... ) .groupby('date') .agg( countDistinct('some_id').alias('some_id_dist_cnt'), ... ) .sort('date') ) # TODO: when().otherwise() 추가 join_source_df = ... joined_df = (transformed_df .join( join_source_df, on='some_id', how='inner' ) ) joined_df = joined_df.cache() # or persist() result_df = (joined_df .select( date_format(current_date(), 'yyyyMMdd').alias('etl_date') ... ) ) (result_df.write.format('parquet') .mode('overwrite') .option('partitionOverwriteMode', 'dynamic') .partitionBy( 'year', 'month', 'day' ) .save('s3://bucket_name/table_name') ) ``` 위 코딩 컨벤션과 같은 규칙이 깔끔해보이거나 공감되신다면 아래의 내용을 더 주시길 바랍니다. 제 규칙을 글로 설명하면 다음과 같습니다. 1. Python의 메서드 체이닝을 적극 활용한다. 1. 대상 DataFrame은 괄호의 바로 뒤에 사용하고 줄바꿈한다. 2. 각 메서드와 그 안의 값이 3개 이상이면 줄바꿈을 하고 항상 하나의 indent를 적용한다. 2. 메서드 안에서 쓰이는 값이 3개 이상이거나 너무 길 경우 줄바꿈을 적용한다. 3. 맨 처음 불러온 DataFrame의 변수명에는 raw를 넣는다. 4. DataFrame 객체에 해당하는 변수에는 항상 맨 끝에 df를 넣는다. 자, 위 코드를 좀 더 나누고 자세히 설명드리겠습니다. ### 1. 데이터 불러오기 ```python raw_df = spark.read.format('parquet').load('s3://bucket_name/table_name') ``` raw_df 선언 과정에는 크게 할 얘기는 없습니다. 다만, 보통 `read.format(’parquet’).load()` 와 같이 쓰는 형태와 `read.parquet(file_path)` 가 있습니다. 제 경우에는 `format()` 과 같이 사용하는데, 그 이유는 이후 필요한 option()을 적용하거나, 다른 format으로 변경할 때에 메서드를 변경하는 것이 아닌 파라미터를 변경하는 구조로 더 읽기 쉽고 유연하기 때문입니다. 또한 변수명은 항상 `*_raw_df` 와 같은 형태로 사용합니다. 예를 들어 user_event 테이블을 불러왔을 경우에는 `user_event_raw_df` 와 같은 형태인 것이죠. (이 부분은 raw_*_df와 *_raw_df와 같은 두 가지 방식이 있을 수 있습니다) 종종 file_path에 들어가는 텍스트가 긴 경우도 있습니다. 그럴 경우 메서드 체이닝과 함께 `.load()` 를 한 라인 내리는 방식으로 사용합니다. ```python raw_df = (spark.read.format('parquet') .load('s3://bucket_name/db_name/table_name/...') ) ``` 여기에 만약 option()을 추가한다면, 다음과 같이 작성할 수 있겠습니다. ```python raw_df = (spark.read.format('parquet') .option(...) .load('s3://bucket_name/db_name/table_name/...') ) ``` ### 2. 데이터 변환하기 ```python transformed_df = (raw_df .filter( (col('date') >= '20250111') & ... ) .groupby('date') .agg( countDistinct('some_id').alias('some_id_dist_cnt'), ... ) .sort('date') ) ``` 여기에는 많은 내용이 담겨있으니 보다 자세히 설명해보겠습니다. ### 2-1. 데이터 변환하기: 대상 DataFrame의 위치 1번 라인을 보시면, `raw_df` 를 남겨두고 그 나머지는 전부 줄바꿈한 것을 볼 수 있습니다. 종종 외국의 PySpark 예제를 보면 제가 작성한 것과는 다르게 `raw_df` 또한 줄바꿈을 한 경우가 많습니다. ```python # 대상 DataFrame을 한번 더 줄바꿈 한 형태(저는 이 방식을 추천하지 않습니다) transformed_df = ( raw_df .filter() ... ) ``` 이렇게 작성하는 것도 괜찮아보이지만, 제 의견으로는 대상이되는 DataFrame 변수와 이후 메서드의 indent가 동일하여 덜 명시적인 것 같다고 생각이 됩니다. 또한 변수를 선언한 1번 라인에 정보가 너무 적다고 생각됩니다. 따라서, 저는 첫 줄에 변수 선언과 함께 대상이 되는 DataFrame을 함께 작성하기를 추천합니다. ### 2-2. 데이터 변환하기: `filter()` 자, 다음은 `filter()` 부분을 살펴보겠습니다. ```python transformed_df = (raw_df .filter( (col('date') >= '20250111') & ... ) ) ``` 만약 filter 조건이 하나라면, 한 줄에 나열하는 형태로 사용합니다. ```python transformed_df = raw_df.filter(col('date') >= '20250111') ``` 하지만 조건이 하나 이상이라면, 다음과 같이 작성합니다. ```python transformed_df = (raw_df .filter( (col('date') >= '20250111') & (col('some_id') == '1234') ) ) ``` 물론, 이 부분에도 다양한 의견이 있으며, 주로 다음과 같은 형태로도 많이 작성됩니다. ```python # 메서드 체이닝을 사용하지 않은 형태 transformed_df = raw_df.filter( (col('date') >= '20250111') & (col('some_id') == '1234') ) # 메서드 체이닝을 사용하고, AND(&) 또는 OR(|)을 조건문 뒤에 매치한 형태 transformed_df = (raw_df .filter( (col('date') >= '20250111') & (col('some_id') == '1234') ) ) ``` 제 경험 상 데이터를 처리할 때 메서드를 하나만 쓰는 경우가 적었습니다. 그 말은 메서드 DataFrame 이에 따라 라인이 조금이라도 길어질 경우 메서드 체이닝과 함께 indent를 규칙에 맞게 넣는 형태로 많이 작성하였습니다. 이어서, filter의 여러 조건을 사용할 때 AND 또는 OR 조건 뒤에 매치했을 때와 앞에 배치했을 때를 설명드리겠습니다. 만약 AND, OR를 filter의 조건 뒤에 매치하면 어떤 불편함이 발생할까요? 저는 이런 경험을 종종 했습니다. filter의 조건 중 하나를 주석처리하거나 제거해야 한다. 그렇게 될 경우 다음과 같이 코드를 수정하게 되겠죠. (보통 filter의 맨 처음 조건은 그 뒤의 조건보다 변경될 확률이 적은 조건이 먼저 오게 됩니다) ```python transformed_df = (raw_df .filter( (col('date') >= '20250111') & # (col('some_id') == '1234') ) ) ``` 이 때, Python은 해당 문법이 맞지 않다는 에러를 발생시킵니다. 그렇다면, AND, OR를 filter의 조건 앞에 두면 어떨까요? ```python transformed_df = (raw_df .filter( (col('date') >= '20250111') # & (col('some_id') == '1234') ) ) ``` 이 때는 문법 상 틀린 부분이 없기 때문에 정상 동작하게 됩니다. 따라서 저는 AND, OR를 조건 앞에 두는 규칙으로 정하게 되었습니다. 작성중... :)