Prefect는 파이썬으로 구축된 현대적인 워크플로우 오케스트레이션 플랫폼입니다. 데이터 워크플로우를 자동화하고, 조정하며, 모니터링하는 것을 목적으로 설계되었습니다. Prefect는 배치 프로세스를 스케줄링하고, 데이터 파이프라인이 원활하게 실행되도록 보장하며, 오류를 우아하게 처리하는 데 이상적입니다.
시나리오: 엑셀 데이터 처리 및 API 요청
이 시나리오에서는 엑셀 파일에서 데이터를 로드하고, API에 POST 요청을 하는 배치 프로세스를 자동화하는 방법을 Prefect를 사용하여 다루게 됩니다. 다음과 같은 단계를 포함합니다:
엑셀 파일에서 데이터를 로드합니다.
정해진 개수만큼 데이터를 처리하여 API 요청을 합니다.
각 API 요청의 성공 또는 실패를 기록합니다.
엑셀 파일의 모든 데이터가 처리될 때까지 과정을 반복합니다.
Prefect를 이용한 구현
각 단계의 작업을 정의하고, 플로우를 생성하여 실행 순서와 의존성을 관리합니다.
엑셀 데이터 로드
# 엑셀 데이터 로드를 위한 태스크
@task
def load_excel_data(file_path):
return pd.read_excel(file_path)
데이터 처리 및 API 요청
# 데이터 처리 및 API 요청을 위한 태스크
@task
def process_and_request(data_chunk, user_agent_map):
# 데이터 처리 및 API 요청 로직 작성
결과 기록
# API 요청 결과를 저장하기 위한 태스크
@task
def save_results(file_path, results):
# 결과 저장 로직 작성
플로우 생성 및 실행
# 태스크를 조정하기 위한 플로우
with Flow("batch_process_excel_data") as flow:
# 플로우 정의 내용
# 플로우 실행
flow.run(parameters={
'file_path': 'data.xlsx',
'user_agent_map': {1: 'TypeA', 2: 'TypeB'}
})
결론
Prefect를 사용한 배치 프로세스 자동화는 데이터 워크플로우를 관리하는 강력하고 확장 가능한 방법을 제공합니다. 복잡한 작업, 재시도, 오류 로깅을 처리할 수 있는 능력을 갖추고 있어, 데이터 엔지니어나 과학자가 데이터 프로세스를 자동화하는 데 있어 가치 있는 도구가 될 수 있습니다.
from prefect import task, Flow, Parameter
import pandas as pd
import requests
# API 요청을 위한 함수
def send_api_request(data):
# API 요청을 보내고 결과를 반환
# 예시로 requests.post('https://api.example.com', json=data)를 사용할 수 있습니다.
response = requests.post('https://api.example.com', json=data)
return response.ok # 성공 여부 반환
# 엑셀 데이터 로드
@task
def load_excel_data(file_path):
return pd.read_excel(file_path)
# 데이터 전처리 및 API 요청
@task
def process_and_request(data_chunk, user_agent_map):
results = []
for index, row in data_chunk.iterrows():
# 데이터 전처리 로직
user_agent = user_agent_map.get(row['user_agent_type'], 'default')
data = {
'payment_id': row['payment_id'],
'user_agent': user_agent,
# 추가 필드...
}
# API 요청
success = send_api_request(data)
results.append(success)
return results
# 기록을 저장하는 함수
@task
def save_results(file_path, results):
with open(file_path, 'a') as f:
for result in results:
f.write(f"{result}\n")
# 플로우 정의
with Flow("batch_process_excel_data") as flow:
# 파일 경로와 사용자 에이전트 매핑을 파라미터로 받음
file_path = Parameter('file_path')
user_agent_map = Parameter('user_agent_map')
# 엑셀 데이터 로드
excel_data = load_excel_data(file_path)
# 데이터를 청크로 나누어 처리 (예: 100개씩)
for start in range(0, len(excel_data), 100):
data_chunk = excel_data[start:start+100]
# 데이터 전처리 및 API 요청
results = process_and_request(data_chunk, user_agent_map)
# 결과 저장
save_results(file_path, results)
# 필요시 플로우 스케줄링
# flow.schedule = Schedule(cron="0 0 * * *") # 매일 자정에 실행
# 플로우 실행 (로컬에서 실행할 때)
if __name__ == "__main__":
flow.run(parameters={
'file_path': 'data.xlsx',
'user_agent_map': {1: 'TypeA', 2: 'TypeB'}
})