본문 바로가기

Cloud Providers/AWS

Opensearch로 Cloudfront의 access_log를 실시간으로 분석해보자

💡 참고자료
[AWS 공식블로그]

Kinesis Data Stream 생성


  1. Kinesis 콘솔
  2. 데이터 스트림 생성
  3. 이름 cloudfront-real-time-log-data-stream 지정
  4. 온디멘드 요금 설정
  5. 성공

Cloud Front 배포, 실시간 로그 활성화


  1. Cloud Formation 활용하여 손쉽게 배포
  2. 생성된 S3에 정적파일 업로드
  3. Cloud Front 콘솔
  4. 원격측정 > 로그
  5. 실시간 로그 구성 > 구성 생성
  6. 이름 CloudFrontRealTimeConfigName
  7. 샘플링 속도 100
  8. 원하는 필드 선택
  9. 생성한 Kinesis data stream ARN 등록
  10. 배포, 기본 캐싱 설정
  11. 성공

Kinesis Firehose에서 로그 처리용 Lambda 함수 생성하기


  1. 람다 콘솔
  2. 함수 생성
  3. 이름 cf-real-time-logs-transformer
  4. 런타임 Python 3.8
  5. 기본 람다 권한을 가진 새로운 역할 생성!
  6. 함수 코드
import base64
    import json

    print('Loading function')

    def lambda_handler(event, context):
        output = []

        # Based on the fields chosen during the creation of the 
        # Real-time log configuration.
        # The order is important and please adjust the function if you have removed 
        # certain default fields from the configuration. 
        realtimelog_fields_dict = {
        "timestamp" : "float", 
        "c-ip" : "str", 
        "time-to-first-byte" : "float", 
        "sc-status" : "int", 
        "sc-bytes" : "int", 
        "cs-method" : "str", 
        "cs-protocol" : "str",
        "cs-host" : "str", 
        "cs-uri-stem" : "str", 
        "cs-bytes" : "int",
        "x-edge-location" : "str", 
        "x-edge-request-id" : "str", 
        "x-host-header" : "str", 
        "time-taken" : "float", 
        "cs-protocol-version" : "str",
        "c-ip-version" : "str", 
        "cs-user-agent" : "str", 
        "cs-referer" : "str", 
        "cs-cookie" : "str", 
        "cs-uri-query" : "str", 
        "x-edge-response-result-type" : "str", 
        "x-forwarded-for" : "str", 
        "ssl-protocol" : "str", 
        "ssl-cipher" : "str", 
        "x-edge-result-type" : "str", 
        "fle-encrypted-fields": "str", 
        "fle-status" : "str",
        "sc-content-type" : "str", 
        "sc-content-len" : "int", 
        "sc-range-start" : "int", 
        "sc-range-end" : "int", 
        "c-port" : "int", 
        "x-edge-detailed-result-type" : "str", 
        "c-country" : "str", 
        "cs-accept-encoding" : "str",
        "cs-accept" : "str",
        "cache-behavior-path-pattern" : "str",
        "cs-headers" : "str", 
        "cs-header-names" : "str", 
        "cs-headers-count" : "int"
        }

        for record in event['records']:

            # Extracting the record data in bytes and base64 decoding it
            payload_in_bytes = base64.b64decode(record['data'])

            # Converting the bytes payload to string
            payload = "".join(map(chr, payload_in_bytes))

            # dictionary where all the field and record value pairing will end up
            payload_dict = {}

            # counter to iterate over the record fields
            counter = 0

            # generate list from the tab-delimited log entry
            payload_list = payload.strip().split('t')

            # perform the field, value pairing and any necessary type casting.
            # possible types are: int, float and str (default)
            for field, field_type in realtimelog_fields_dict.items():
                #overwrite field_type if absent or '-'
                if(payload_list[counter].strip() == '-'):
                    field_type = "str"
                if(field_type == "int"):
                    payload_dict[field] = int(payload_list[counter].strip())
                elif(field_type == "float"):
                    payload_dict[field] = float(payload_list[counter].strip())
                else:
                    payload_dict[field] = payload_list[counter].strip()
                counter = counter + 1

            # JSON version of the dictionary type
            payload_json = json.dumps(payload_dict)

            # Preparing JSON payload to push back to Firehose
            payload_json_ascii = payload_json.encode('ascii')
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(payload_json_ascii).decode("utf-8")
            }
            output.append(output_record)

        print('Successfully processed {} records.'.format(len(event['records'])))

        return {'records': output}

제한 시간 3초 → 1분 변경

Opensearch 생성 및 백업 S3 버킷 생성하기


  1. Opensearch 콘솔
  2. 도메인 생성
  3. 이름 cf-realtime-log-es-domain
  4. 배포 유형: 개발 및 테스트
  5. 데이터 노드: 최소 사양
  6. 퍼블릭 액세스 허용: 테스트 용이므로
  7. 세분화된 액세스 제어 활성화
  8. 마스터 이름, 암호 생성 → Kibana 접속시 사용
  9. 생성
  10. 성공

Kinesis Data Firehose를 통해 분석 파이프 라인 설정


  1. Kinesis 콘솔
  2. 전송 스트림 생성
  3. source로 Kinesis, Destination으로 Opensearch 선택
  4. 이름 cloudfront-real-time-log-data-kinesis-firehose-consumer
  5. Transform record 활성화 후, 아까만든 람다 지정
  6. Opensearch Index에 realtime 입력
  7. S3 백업 지정
  8. 성공

Opensearch에서 Kinesis Data Firehose 서비스 설정하기


  1. url로 대시보드 접근
  2. 마스터 이름, 암호 사용