개요
ELT 패턴은 데이터 파이프라인에 이상적인 설계로, 처음 두단계인 데이터 추출(EXTRACT)과 데이터 로드(LOAD)를 모두 데이터 수집이라고 이야기합니다.
아래 내용에서는 데이터를 추출하고 로드하는 과정에 있어,
1) 개발 환경과 인프라를 설정하는 방법
2) 다양한 소스 시스템에서 데이터를 추출하는 방법
두 가지를 설명합니다.
내용의 대다수가 예제 코드로 구성되어 있고, 파이썬 환경설정부터 설명합니다. 이 부분은 생략하도록 하겠습니다.
전체 내용을 다이어그램으로 그려보면, 다음과 같습니다.
모든 예제에서는
1) 데이터베이스(mysql, mongo, pg)에서 데이터를 추출
2) 추출된 데이터를 S3 버켓으로 로드
3) S3에서 데이터 웨어하우스(Redshift)로 데이터 로드(수집 완료)
세 단계를 기반으로 합니다. 4장에서는 1, 2만 다루며 3의 경우 5장에서 이루어집니다.
데이터베이스에서 데이터를 추출하기 전에, 어떻게 하면 코드로 AWS의 S3와 데이터베이스를 연결할 수 있을지 살펴보겠습니다.
AWS: S3, IAM, Boto3
S3는 클라우드 객체 스토리지 서비스로, 사용자에게 원하는 양의 데이터를 저장하고 보호할 수 있습니다. 데이터 레이크, 프론트엔드 호스팅, 모바일 앱과 같은 모든 사용 사례를 지원합니다. 예제에서는 데이터베이스에서 데이터를 추출하여 csv 파일로 추출한 뒤, S3에 업로드합니다.
(데이터 레이크로 활용한다고 이해했습니다)
IAM은 AWS의 클라우드 서비스에 대한 접근 권한이라고 이해할 수 있습니다. IAM USER를 생성하고, USER에 ROLE을 부여하여 클라우드 서비스에 대한 접근 권한을 부여합니다.
위 링크에서는 S3를 생성하고, IAM 유저를 만들어 S3에 대한 ROLE(S3FullAccess)을 부여하는 방법을 설명합니다.
Boto3는 Python용 AWS SDK로, 파이썬 코드를 통해 AWS Service를 생성하고 매니징할 수 있게 해 줍니다.
csv 파일을 S3 버켓에 업로드하는 코드만 첨부하겠습니다.
parser = configparser.ConfigParser()
# IAM 유저 생성 후 발급되는 SECRET KEY, ACCESS KEY를 의미
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client('s3',
aws_access_key_id = access_key,
aws_secret_access_key = secret_key)
s3_file = '''csv file name'''
s3.upload_file('''csv file''', bucket_name, s3_file)
Python
복사
데이터베이스에서 추출된 csv 파일을 업로드하는 클라우드 설정에 대해 먼저 알아보았습니다. 다음은 데이터베이스 종류 별 데이터 추출 예제입니다.
Database: mysql
mysql 데이터베이스에서 데이터 추출은 두 가지 방법이 있습니다.
•
SQL을 사용한 전체 || 증분 추출
◦
구현하기 간단함
◦
대규모 데이터셋에서는 확장성이 떨어짐
•
이진 로그(binlog) 복제
◦
구현이 복잡함
◦
원본 테이블의 변경되는 데이터 볼륨이 크거나, 소스에서 데이터를 자주 수집해야 하는 경우 자주 사용됨
◦
스트리밍 데이터 수집을 수행하는 하나의 경로이기도 함
SQL을 사용한 전체 || 증분 추출
전체 추출 | 증분 추출 | |
장점 | 추출 작업을 실행할 때마다 테이블의 모든 레코드가 추출됨 | 추출 작업의 마지막 실행 이후 변경되거나 추가된 원본 테이블의 레코드만 추출됨
- 마지막 추출의 타임스탬프를 추출 작업 로그 테이블에 저장하거나, 대상 테이블에서 마지막 업데이트 열의 최대 타임스탬프를 쿼리하여 검색.
⇒ 최적의 성능에 이상적 |
단점 | 시간 오래걸림 | 1) 삭제된 행은 캡처되지 않고, 원본(Mysql)에서 행이 삭제되면 아무것도 변경되지 않은 것처럼 됨.
2) 신뢰할 수 있는 타임스탬프가 있어야 하는데, 열이 누락되거나 업데이트되지 않을 수 있음. |
MySQL 데이터의 이진 로그 복제
이진 로그란?
•
데이터베이스에서 수행된 모든 작업에 대한 기록을 보관하는 로그
•
다른 mysql 인스턴스로 데이터를 복제하는 것이 목적이지만, 이진 로그를 통해 데이터 웨어하우스로 보낼 데이터를 수집하기 용이함
CDC란?
•
CDC(변경 데이터 캡처)는 SQL Server 에이전트를 사용하여 테이블에 적용되는 삽입, 업데이트 및 삭제 작업을 기록
•
데이터 웨어하우스 내의 원본 테이블의 변경 내용을 반영해야 하지만, 원본 복제본을 고치는 기술은 적합하지 않기 때문에
•
구조화되고 안정적인 변경 데이터 스트림이 필요해서 CDC를 사용하게 됩니다.
구성 방법
1.
mysql 서버에서 이진 로그를 활성화 + 구성
2.
초기 전체 테이블 추출을 실행 + 로드
3.
지속적으로 이진 로그 추출
4.
추출된 이진 로그를 데이터 웨어하우스로 변환하여 로드
I. 활성화 상태 확인
select variable_value as bin_log_status
from performance_schema.global_variables
where variable_name='log_bin'
SQL
복사
II. 이진 로깅 형식 설정
•
STATEMENT: 이진 로그에 행을 삽입하거나 수정하는 행동들에 대해 SQL 문 자체를 기록
◦
데이터 웨어하우스 용도는 아님
•
ROW: 테이블의 행에 대한 모든 변경 사항이 행 자체의 데이터로 이진 로그 행에 표시됨
◦
기본 형식
•
MIXED: 이진 로그에 STATE 형식 레코드와 ROW 형식 레코드를 모두 기록
◦
DB 터지기 딱 좋음
III. 관련 정보 추출 + 데이터 웨어하우스에 로드할 파일(csv)에 저장
python mysql-replication 사용
# 세팅 후라고 가정
b_stream = BinLogStreamReader(
connection_settings = mysql_settings,
server_id=100,
only_events=[
row_event.DeleteRowsEvent,
row_event.WriteRowsEvent,
row_event.UpdateRowsEvent
]
)
for event in b_stream:
event.dump()
b_stream.close()
Python
복사
위의 코드를 실행하면, 이진 로그를 사람이 읽을 수 있는 형태로 추출하게 된다.
다음 과정은
1) 데이터를 다른 형식으로 파싱하고 기록(CSV 파일의 행에 각 이벤트를 기록)
2) 추출/로드 테이블 당 하나의 파일을 작성
3) S3 버켓에 업로드
위와 같은데, 코드 예제는 생략하겠습니다.
Database: PostgreSQL
마찬가지로 전체 추출 또는 증분 추출을 할 수 있습니다. 이는 위와 유사합니다. 또다른 방법으로는 Postgres WAL(Write-Ahead Log)를 데이터 스트림으로 변환하는 방법이 제시됩니다.
Postgres WAL
•
Debezium(오픈 소스 분산 플랫폼)을 사용하여 스트리밍하는 것을 제안
Database: Mongo
MongoDB는 대표적인 nosql 중 하나로, nosql중에서도 Document 형식을 갖고 있습니다. 몽고는 스키마와 유사한 Collection, Collection 아래에 자유롭게 저장할 수 있는 Document가 존재합니다. Document는 작성 시점에 타임 스탬프가 기록됩니다.
python pymongo, dnspython
from pymongo import MongoClient
import csv
import boto3
import datetime
from datetime import timedelta
import configparser
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name = parser.get("mongo_config",
"database")
collection_name = parser.get("mongo_config",
"collection")
mongo_client = MongoClient(
"mongodb+srv://" + username
+ ":" + password
+ "@" + hostname
+ "/" + database_name
+ "?retryWrites=true&"
+ "w=majority&ssl=true&"
+ "ssl_cert_reqs=CERT_NONE")
# connect to the db where the collection resides
mongo_db = mongo_client[database_name]
# choose the collection to query documents from
mongo_collection = mongo_db[collection_name]
start_date = datetime.datetime.today() + timedelta(days = -1)
end_date = start_date + timedelta(days = 1 )
mongo_query = { "$and":[{"event_timestamp" : { "$gte": start_date }}, {"event_timestamp" : { "$lt": end_date }}] }
event_docs = mongo_collection.find(mongo_query, batch_size=3000)
# create a blank list to store the results
all_events = []
# iterate through the cursor
for doc in event_docs:
# Include default values
event_id = str(doc.get("event_id", -1))
event_timestamp = doc.get(
"event_timestamp", None)
event_name = doc.get("event_name", None)
# add all the event properties into a list
current_event = []
current_event.append(event_id)
current_event.append(event_timestamp)
current_event.append(event_name)
# add the event to the final list of events
all_events.append(current_event)
export_file = "export_file.csv"
with open(export_file, 'w') as fp:
csvw = csv.writer(fp, delimiter='|')
csvw.writerows(all_events)
fp.close()
Python
복사
Database: REST API
조직에서 만들고 유지 관리하는 API 혹은 조직에서 사용하는 외부 서비스의 API에서 데이터를 수집해야 할 때도 있습니다. REST API를 활용한 데이터 추출에서는 공통적인 패턴이 있습니다.
1.
API 엔드포인트로 HTTP GET 요청을 보낸다
2.
JSON 형식일 가능성이 높은 응답을 수락한다.
3.
응답을 구문 분석하고 나중에 데이터 웨어하우스에 로드할 수 있는 CSV 파일로 변환한다.
예제에서는 Open Notify라는 우주 데이터 라이브러리를 호출합니다
lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon}
api_response = requests.get(
"http://api.open-notify.org/iss-pass.json", params=lat_log_params)
# create a json object from the response content
response_json = json.loads(api_response.content)
all_passes = []
for response in response_json['response']:
current_pass = []
#store the lat/log from the request
current_pass.append(lat)
current_pass.append(lon)
# store the duration and risetime of the pass
current_pass.append(response['duration'])
current_pass.append(response['risetime'])
all_passes.append(current_pass)
export_file = "export_file.csv"
with open(export_file, 'w') as fp:
csvw = csv.writer(fp, delimiter='|')
csvw.writerows(all_passes)
fp.close()
Python
복사
파이썬에서 해당 api 엔드포인트로 리퀘스트를 보낸 다음, 받아온 응답 값을 Json 형태로 파싱합니다.
파싱한 json 값을 마찬가지로 csv 파일로 만들어서, boto3에 올린다고 보시면 되겠습니다.
카프카 및 Debezium을 통한 스트리밍 데이터 수집
mysql 이진 로그 또는 pg WALs와 같은 CDC 시스템을 통해 데이터를 수집할 경우, 프레임워크의 도움이 필요합니다. 구현이 복잡하기 때문인데요.
Debezium은 여러 오픈 소스 서비스로 구성된 분산 시스템으로, 일반적인 CDC 시스템에서 행 수준 변경을 캡처한 후 다른 시스템에서 사용할 수 있는 이벤트로 스트리밍합니다.
설치에 필요한 세 가지 주요 구성 요소가 있습니다.
•
아파치 주키퍼: 분산 환경을 관리하고 각 서비스의 구성을 처리
•
아파치 카프카: 분산 스트리밍 플랫폼
•
아파치 카프카 커넥트: 데이터를 카프카를 통해 쉽게 스트리밍할 수 있도록 카프카를 다른 시스템과 연결. CDC 시스템의 데이터를 카프카 토픽으로 변환
카프카는 토픽 별로 정리된 메세지를 교환합니다. 하나의 시스템(sql 등)은 토픽에 publish(produce)할 수도 있고, 토픽을 consume 혹은 subscribe할 수도 있습니다.
Debezium은 이 시스템의 연결을 모두 관리할 수 있는 오픈소스라고 보시면 되겠습니다.
결론적으로 위의 이진 로그를 파이썬으로 직접 뽑아 쓰기 보다는 Debezium 써서 연결하는 방법을 배워라… 라고 정리할 수 있겠습니다.