💡 참고자료
[AWS 공식블로그]

Kinesis Data Stream 생성
- Kinesis 콘솔
- 데이터 스트림 생성
- 이름
지정 - 온디멘드 요금 설정
- 성공
Cloud Front 배포, 실시간 로그 활성화
- Cloud Formation 활용하여 손쉽게 배포
- 생성된 S3에 정적파일 업로드
- Cloud Front 콘솔
- 원격측정 > 로그
- 실시간 로그 구성 > 구성 생성
- 이름
- 샘플링 속도 100
- 원하는 필드 선택
- 생성한 Kinesis data stream ARN 등록
- 배포, 기본 캐싱 설정
- 성공
Kinesis Firehose에서 로그 처리용 Lambda 함수 생성하기
- 람다 콘솔
- 함수 생성
- 이름
- 런타임 Python 3.8
- 기본 람다 권한을 가진 새로운 역할 생성!
- 함수 코드
import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
# Based on the fields chosen during the creation of the
# Real-time log configuration.
# The order is important and please adjust the function if you have removed
# certain default fields from the configuration.
realtimelog_fields_dict = {
"timestamp" : "float",
"c-ip" : "str",
"time-to-first-byte" : "float",
"sc-status" : "int",
"sc-bytes" : "int",
"cs-method" : "str",
"cs-protocol" : "str",
"cs-host" : "str",
"cs-uri-stem" : "str",
"cs-bytes" : "int",
"x-edge-location" : "str",
"x-edge-request-id" : "str",
"x-host-header" : "str",
"time-taken" : "float",
"cs-protocol-version" : "str",
"c-ip-version" : "str",
"cs-user-agent" : "str",
"cs-referer" : "str",
"cs-cookie" : "str",
"cs-uri-query" : "str",
"x-edge-response-result-type" : "str",
"x-forwarded-for" : "str",
"ssl-protocol" : "str",
"ssl-cipher" : "str",
"x-edge-result-type" : "str",
"fle-encrypted-fields": "str",
"fle-status" : "str",
"sc-content-type" : "str",
"sc-content-len" : "int",
"sc-range-start" : "int",
"sc-range-end" : "int",
"c-port" : "int",
"x-edge-detailed-result-type" : "str",
"c-country" : "str",
"cs-accept-encoding" : "str",
"cs-accept" : "str",
"cache-behavior-path-pattern" : "str",
"cs-headers" : "str",
"cs-header-names" : "str",
"cs-headers-count" : "int"
for record in event['records']:
# Extracting the record data in bytes and base64 decoding it
payload_in_bytes = base64.b64decode(record['data'])
# Converting the bytes payload to string
payload = "".join(map(chr, payload_in_bytes))
# dictionary where all the field and record value pairing will end up
payload_dict = {}
# counter to iterate over the record fields
counter = 0
# generate list from the tab-delimited log entry
payload_list = payload.strip().split('t')
# perform the field, value pairing and any necessary type casting.
# possible types are: int, float and str (default)
for field, field_type in realtimelog_fields_dict.items():
#overwrite field_type if absent or '-'
if(payload_list[counter].strip() == '-'):
field_type = "str"
if(field_type == "int"):
payload_dict[field] = int(payload_list[counter].strip())
elif(field_type == "float"):
payload_dict[field] = float(payload_list[counter].strip())
payload_dict[field] = payload_list[counter].strip()
counter = counter + 1
# JSON version of the dictionary type
payload_json = json.dumps(payload_dict)
# Preparing JSON payload to push back to Firehose
payload_json_ascii = payload_json.encode('ascii')
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload_json_ascii).decode("utf-8")
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
제한 시간 3초 → 1분 변경
Opensearch 생성 및 백업 S3 버킷 생성하기
- Opensearch 콘솔
- 도메인 생성
- 이름
- 배포 유형: 개발 및 테스트
- 데이터 노드: 최소 사양
- 퍼블릭 액세스 허용: 테스트 용이므로
- 세분화된 액세스 제어 활성화
- 마스터 이름, 암호 생성 → Kibana 접속시 사용
- 생성
- 성공
Kinesis Data Firehose를 통해 분석 파이프 라인 설정
- Kinesis 콘솔
- 전송 스트림 생성
- source로 Kinesis, Destination으로 Opensearch 선택
- 이름
- Transform record 활성화 후, 아까만든 람다 지정
- Opensearch Index에 realtime 입력
- S3 백업 지정
- 성공
Opensearch에서 Kinesis Data Firehose 서비스 설정하기
- url로 대시보드 접근
- 마스터 이름, 암호 사용
'Cloud Providers > AWS' 카테고리의 다른 글
S3로 정적 호스팅 하기 (0) | 2022.07.27 |
[AWS]Cloud Trail로 계정 활동을 추적하자 (0) | 2022.07.04 |
[AWS]EC2 burting (0) | 2022.05.12 |
[lambda]Serverless Framework를 사용하여, 람다함수로 express 서버를 띄워보자 (0) | 2022.05.01 |
[AWS]Public, Private subnet과 VAT, internet gateway (0) | 2022.04.18 |