본문 바로가기

Python

Prefect

반응형

 

파이썬과 Prefect를 이용한 배치 프로세스 자동화

Prefect를 사용하여 데이터 워크플로우를 자동화하고 관리하는 방법을 알아봅시다.

Prefect란 무엇인가?

Prefect는 파이썬으로 구축된 현대적인 워크플로우 오케스트레이션 플랫폼입니다. 데이터 워크플로우를 자동화하고, 조정하며, 모니터링하는 것을 목적으로 설계되었습니다. Prefect는 배치 프로세스를 스케줄링하고, 데이터 파이프라인이 원활하게 실행되도록 보장하며, 오류를 우아하게 처리하는 데 이상적입니다.

시나리오: 엑셀 데이터 처리 및 API 요청

이 시나리오에서는 엑셀 파일에서 데이터를 로드하고, API에 POST 요청을 하는 배치 프로세스를 자동화하는 방법을 Prefect를 사용하여 다루게 됩니다. 다음과 같은 단계를 포함합니다:

  1. 엑셀 파일에서 데이터를 로드합니다.
  2. 정해진 개수만큼 데이터를 처리하여 API 요청을 합니다.
  3. 각 API 요청의 성공 또는 실패를 기록합니다.
  4. 엑셀 파일의 모든 데이터가 처리될 때까지 과정을 반복합니다.

Prefect를 이용한 구현

각 단계의 작업을 정의하고, 플로우를 생성하여 실행 순서와 의존성을 관리합니다.

엑셀 데이터 로드

# 엑셀 데이터 로드를 위한 태스크
@task
def load_excel_data(file_path):
    return pd.read_excel(file_path)
            

데이터 처리 및 API 요청

# 데이터 처리 및 API 요청을 위한 태스크
@task
def process_and_request(data_chunk, user_agent_map):
    # 데이터 처리 및 API 요청 로직 작성
            

결과 기록

# API 요청 결과를 저장하기 위한 태스크
@task
def save_results(file_path, results):
    # 결과 저장 로직 작성
            

플로우 생성 및 실행

# 태스크를 조정하기 위한 플로우
with Flow("batch_process_excel_data") as flow:
    # 플로우 정의 내용

# 플로우 실행
flow.run(parameters={
    'file_path': 'data.xlsx',
    'user_agent_map': {1: 'TypeA', 2: 'TypeB'}
})
            

결론

Prefect를 사용한 배치 프로세스 자동화는 데이터 워크플로우를 관리하는 강력하고 확장 가능한 방법을 제공합니다. 복잡한 작업, 재시도, 오류 로깅을 처리할 수 있는 능력을 갖추고 있어, 데이터 엔지니어나 과학자가 데이터 프로세스를 자동화하는 데 있어 가치 있는 도구가 될 수 있습니다.

게시자: ByteBridge

 

전체 참고용 코드

from prefect import task, Flow, Parameter
import pandas as pd
import requests

# API 요청을 위한 함수
def send_api_request(data):
    # API 요청을 보내고 결과를 반환
    # 예시로 requests.post('https://api.example.com', json=data)를 사용할 수 있습니다.
    response = requests.post('https://api.example.com', json=data)
    return response.ok  # 성공 여부 반환

# 엑셀 데이터 로드
@task
def load_excel_data(file_path):
    return pd.read_excel(file_path)

# 데이터 전처리 및 API 요청
@task
def process_and_request(data_chunk, user_agent_map):
    results = []
    for index, row in data_chunk.iterrows():
        # 데이터 전처리 로직
        user_agent = user_agent_map.get(row['user_agent_type'], 'default')
        data = {
            'payment_id': row['payment_id'],
            'user_agent': user_agent,
            # 추가 필드...
        }
        # API 요청
        success = send_api_request(data)
        results.append(success)
    return results

# 기록을 저장하는 함수
@task
def save_results(file_path, results):
    with open(file_path, 'a') as f:
        for result in results:
            f.write(f"{result}\n")

# 플로우 정의
with Flow("batch_process_excel_data") as flow:
    # 파일 경로와 사용자 에이전트 매핑을 파라미터로 받음
    file_path = Parameter('file_path')
    user_agent_map = Parameter('user_agent_map')
    # 엑셀 데이터 로드
    excel_data = load_excel_data(file_path)
    # 데이터를 청크로 나누어 처리 (예: 100개씩)
    for start in range(0, len(excel_data), 100):
        data_chunk = excel_data[start:start+100]
        # 데이터 전처리 및 API 요청
        results = process_and_request(data_chunk, user_agent_map)
        # 결과 저장
        save_results(file_path, results)

# 필요시 플로우 스케줄링
# flow.schedule = Schedule(cron="0 0 * * *")  # 매일 자정에 실행

# 플로우 실행 (로컬에서 실행할 때)
if __name__ == "__main__":
    flow.run(parameters={
        'file_path': 'data.xlsx',
        'user_agent_map': {1: 'TypeA', 2: 'TypeB'}
    })

 

반응형

'Python' 카테고리의 다른 글

Start Django project with Python DI manager poetry  (0) 2021.08.22
python data class  (0) 2020.01.19
Python application with Docker Image build and Run  (0) 2019.07.14
python file line read and set  (0) 2019.06.15
pymongo usage  (0) 2019.06.08