본문 바로가기

AWS

[Lambda] CW Logs 구독 필터와 Lambda 연결, Slack으로 인스턴스 스케줄러 동작 내역 확인하기

반응형

1. 배경

AWS Instance Scheduler로 EC2와 RDS를 정해진 시간에 Start하고, Stop한다고 가정해보자.

 

관리자들은 어떠한 서버들이 주기적으로 켜지고 꺼지는지 알아야 할 필요성이 있다.

 

만약 이러한 내용들을 Slack을 통해 Notification을 받고 싶다면 어떻게 해야 할까?

 

Keyword : CloudWatch Logs, Subscription filter, Lambda, Slack

 

 

2. 아이디어 및 아키텍처

우선, 인스턴스 스케줄러는 CloudFormation으로 간단하게 EC2와 RDS를 스케줄링 할 수 있도록 도와주는 툴?이다. AWS 측에서 개발하였으며, Lambda와 CloudWatch, DynamoDB 등을 활용하여 사용자가 정의한 대로 스케줄링 하도록 도와준다.

 

Idea는 다음과 같다

 

1. 인스턴스 스케줄러가 어떠한 작업들을 수행하였는지 CloudWatch Logs에서 확인할 수 있다.


2. Log Stream에 쌓이는 로그 중 어떤 인스턴스들이 켜지고, 꺼지는지 확인할 수 있을 것이다.
    => 특정 Log만 필터를 걸어서 Lambda로 보내기

 

3. Lambda에서는 받은 Data를 적절하게 Parsing하여, Slack Channel으로 전송한다.

 

 

개념 정리

 

- 구독 필터 : Kinesis, Lambda 또는 Kinesis Data Firehose에서 구독 필터를 사용할 수 있으며, 구독 필터를 통해 수신 서비스로 전송되는 Log는 Base64로 인코딩되고 gzip 형식으로 압축된다.

 

      ⇒ Lambda로 수신하게 되면 Log는 디코딩 및 압축 해제하는 과정이 필요할 것으로 예상된다.

 

- 인스턴스 스케줄러 : 사전 정의된 시간에 인스턴스를 중지하고 시작하여 EC2 및 RDS의 사용량 및 요금을 줄이고 싶을 때 사용하며, CloudFormation Stack으로 한 번에 배포할 수 있다. 내부적인 동작은 Lambda, DynamoDB, Tag, CloudWatch 등을 통해 동작하게 된다.

 

Ref : https://aws.amazon.com/ko/premiumsupport/knowledge-center/stop-start-instance-scheduler/

 


아키텍처

Subscription Filter를 걸어서, Lambda로 보내고, Lambda에서 Slack으로 Notification을 보내면 될 것이다.

 

 

3. 과정

 

구독 필터를 적용하고, Lambda를 보내고 이를 Lambda에서 처리하여 Slack까지 전송하는 전체적인 과정을 정리해본다.
방법은 은근 간단하기 때문에 간략하게 설명하도록 한다.

가정

1) 이미 인스턴스 스케줄러가 설정되어 있고, 정기적으로 특정 시간대에 인스턴스가 켜지고, 꺼지고 있다고 가정한다.

2) Slack Channel이 존재하며, 해당 Slack Channel에 대한 Hook URL을 저장해 놓았다.

3) Lambda는 Python3으로 런타임을 설정하며, 기 생성해 놓았다고 가정한다.

 

 

 

1. Subscription Filter 적용

 

원하는 Log Group을 클릭한 뒤 'Create' → Create Lambda subscription filter를 클릭한다.

 

2. 'Subscription filter patter'에 필터링할 패턴 정의

 

"INFO: Scheduler result"란 Text가 포함된 Line을 필터링하고 싶기 때문에 아래와 같이 필터링 패턴을 정의해주었다.

filter name은 임의로 지정해주고, 'Test Pattern'을 수행하면 원하는 결과값이 나온 것을 확인할 수 있다.

 

 

3. Lambda 확인

Lambda에서는 정상적으로 CloudWatch Logs - 구독 필터에 의해 Trigger가 설정된 것을 확인할 수 있다.

 

++++ 추가 설정 부분

 

Lambda에 부착되어 있는 Role에 Policy를 추가해주어야 함!

(Instance Scheduler는 instance_id만 알려주기 때문에, Name 또한 Lambda 내에서 확인하기 위해서 DescribeInstances 권한을 부여해주었다.)

 

 

4. Lambda 환경 변수 설정

Lambda 소스 코드 내에서 사용하기 위한 환경 변수를 설정해준다.

 

 

5. Code 정의

구독 필터를 통해 수신 서비스로 전송되는 로그는 Base64로 인코딩되고 gzip 형식으로 압축되기 때문에, 이를 일반적인 Text로 변환해주어야 한다.

 

아래와 유사한 스택오버플로우 글들을 읽으면 잘 이해가 될 것임

 

Ref : https://stackoverflow.com/questions/50295838/cloudwatch-logs-stream-to-lambda-python

https://medium.com/@NotSoCoolCoder/parsing-subscription-filter-data-in-aws-lambda-27d0e3a09247

 

 

{ACCOUNT_NUMBER} 인 부분은 본인 Account Number에 맞게 치환해주면 된다.

 

import gzip
import json
import base64
import os
import urllib3
import boto3
import sys

# Get values from Environments variables
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']


http = urllib3.PoolManager()

def send_message(slack_message):
    print(slack_message)

    data = json.dumps(slack_message).encode('utf-8')

    res = http.request(
        method='POST',
        url=HOOK_URL,
        body=data
    )

    print(res.data, res.status)


def lambda_handler(event, context):
    
    cw_data = event['awslogs']['data']
    
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    payload = json.loads(uncompressed_payload)
    
    log_events = payload['logEvents']
    
    instance_message = ""
    
    for log_event in log_events:

        strings = json.loads(log_event['message'].split('Scheduler result ')[1].replace("'", "\""))
        
        print(f"{strings.get('{ACCOUNT_NUMBER}')}, {type(strings)}")
        
        if strings.get('{ACCOUNT_NUMBER}').get('started'):
            print(f"시작 인스턴스로 진입")
            instance_message += f"*started 인스턴스 내역*\n"
            for region, instances in strings.get('{ACCOUNT_NUMBER}').get('started').items():
                print(region, instances)
                # 각각의 리전에 대해 인스턴스 내역 출력
                instance_message += f"*Region : {region}*\n"
                
                instance_list = list()
                for instance in instances:
                    for i in list(instance.keys()):
                        instance_list.append(i)
                
                client = boto3.client('ec2', region_name=region)
                
                for reservation in client.describe_instances(
                        InstanceIds=instance_list
                    ).get('Reservations'):
                    print(reservation)
                    for inst in reservation.get('Instances'):
                        instance_name = ''
                        for tag in inst.get('Tags'):
                            if tag.get('Key') == 'Name':
                                instance_message += f"Name : {tag.get('Value')}, ({inst.get('InstanceId')})\n"
                                break
                            
        elif strings.get('{ACCOUNT_NUMBER}').get('stopped'):
            print(f"중지 인스턴스로 진입")
            instance_message += f"*stopped 인스턴스 내역*\n"
            for region, instances in strings.get('{ACCOUNT_NUMBER}').get('stopped').items():
                print(region, instances)
                # 각각의 리전에 대해 인스턴스 내역 출력
                instance_message += f"*Region* : {region}\n"
                
                instance_list = list()
                for instance in instances:
                    for i in list(instance.keys()):
                        instance_list.append(i)
                
                client = boto3.client('ec2', region_name=region)
                
                for reservation in client.describe_instances(
                        InstanceIds=instance_list
                    ).get('Reservations'):
                    print(reservation)
                    for inst in reservation.get('Instances'):
                        instance_name = ''
                        for tag in inst.get('Tags'):
                            if tag.get('Key') == 'Name':
                                instance_message += f"*Name* : {tag.get('Value')}, ({inst.get('InstanceId')})\n"
                                break
    
    print(instance_message)
                            
    if "stopped" in instance_message and "started" in instance_message:
        scheduling_type = "STARTED AND STOPPED"
        color = "#458DCB"
    elif "stopped" in instance_message:
        scheduling_type = "STOPPED"
        color = "#CB4557"
    elif "started" in instance_message:
        scheduling_type = "STARTED"
        color = "#458DCB"
    else:
        sys.exit("종료되거나 시작된 인스턴스가 없습니다.")

    message = {
        "channel": SLACK_CHANNEL,
        "text": f"스케줄러 : {scheduling_type} 인스턴스 내역입니다.",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "팀 계정 스케줄러에 의해 작동된 인스턴스 내역입니다."
                }
            }
        ],
        "attachments": [
            {
                "fallback": "Fallback 입니다.",
                "color": color,
                "blocks": [
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"{instance_message}"
                        }
                    }
                ]
            }
        ]
    }
    
    print(message)


    send_message(message)

 

6. Slack 메시지 확인

중지 인스턴스일 경우 빨간색으로 설정하였으며, 시작 인스턴스일 경우 파랑색으로 설정해주었다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형