본문 바로가기

AWS

[AWS] Multi Account에서 IAM 내역 감사하기 #4 - Lambda를 활용한 Slack Notification

반응형

아래의 글과 대략적인 흐름은 비슷하다.

 

https://nyyang.tistory.com/126

 

[Lambda] IAM, SG 변경 사항을 Slack으로 알람 받기

개요 IAM 혹은 Security Group에서의 변경 사항이 감지되면 Slack으로 통보를 받고 싶습니다. ⇒ Notification을 받는 방법은 여러 가지가 있을 것이다. AWS Config를 통해 감지할 수도 있고, 필자의 방식대로

nyyang.tistory.com

 

1. 아키텍처

1. CloudTrail 추적을 활성화하면 Default EventBus에서 API가 감지된다.

2. Event Rule에서 Event Pattern에 의해 API를 필터링하면 Default EventBus에서 Capture 된다. (Security Account의 Event Rule에서도 동일하게 Security Account의 Custom EvnetBus로 보낸다.)

3. Target으로 Security Account의 Custom EventBus로 보낸다.
4. Custom EventBus로 들어오는 API에 대해 모두 Target으로 SNS로 보낸다.

5. SNS는 Lambda를 Trigger하며 Lambda로 event를 Parameter로 전달한다.

6. Lambda에서는 특정 Logic에 의해 Slack으로 Notificaiton을 보낸다.

 

2. 설명

 

1. Event bus는 Custom하게 생성한 Event Bus로 설정하고, 그 Event Bus에 흐르는 API를 포착하기 위해 Event Rule을 생성한다.

 

2. Target 중 1개는 SNS Topic으로 보내고, 나머지 1개는 OpenSearch로 보내기 위한 CloudWatch logs 이다.

 

EventBus는 별도로 생성해주었다.

 

3. SNS는 Lambda를 Trigger하도록 설정되어 있다.

 

Lambda Runtime은 Python을 사용하였다.

 

4. Slack Notification을 보내기 위한 Lambda Code는 아래와 같다.

중복되는 부분이 있지만, 크게 신경쓰지 않고 코드를 작성하였다.

 

개발자분들이 보기에는 굉장히 간단한 스크립트겠다만, 인프라 운영을 주로 하는 AWS 관리자들의 경우 한번에 보기 힘들 수 있기 때문에 간략히 설명을 하겠음

 

1) Lambda의 event로 json 형식의 data가 전달된다.

2) 원하는 Content를 적절히 Parsing하고 Slack Webhook URL로 Notification을 보낸다.

 

[1] Code에 SLACK Webhook URL을 하드코딩하기 싫어 Lambda 환경변수에 등록하고 가져오는 방식으로 하였다.

[2] Event Rule에서 Slack으로 전달되는 것 중 Alarm을 받고 싶은 특정 API만 지정하기 위해 배열을 하나 생성해주었으며, Noti를 보낼 때 아래의 IAM에 매칭이 될 경우에만 보냄

 

[3] 3개의 Func를 만들어주었는데, Slack Message를 보내는 놈 1개, 위의 [2]에 해당하는지 확인하는 용도 1개, Account ID를 보기 좋게 Account Name으로 변환해주는 용도 1개이다.

 

[4]

iam_event란 변수는 SNS Message로부터 전달받은 IAM Json Data를 저장하며,

그 외에 필요한 부분(source_ip, event_time, aws_region, event_name) 등을 Parsing한다.

 

여기서 중요한 부분이 iam user냐, iam role에 따라서 좀 달라지는데 각각 userIdentity에서 보여지는 Data가 좀 다르게 나온다. 이러한 이유 때문에 user_arn을 가져왔으며, 이렇게 할 경우 AssumedRole로 접속한 AD 계정 혹은 IAM User가 누구인지도 확인이 가능하다.

 

ct_url은 CloudTrail URL을 바로 Slack에서 링크로 제공할 수 있도록 한 부분이다.

 

current_evnet는 Request Parameter를 확인하기 위한 부분이다. 이 부분은 IAM Policy를 추가할 때 어떤 Policy들을 추가하는지 확인할 수 있도록 해준다. (말 그대로 요청 파라미터임)

 

추가로 EventName에 Delete, Detach, Revoke 등의 단어가 들어갈 경우 빨간색으로 처리하도록 하였다.

 

[5] check_event_notification의 함수를 통해 내가 원하는 IAM Event일 경우 Slack으로 Notification을 보내게 된다.

[6] Request Parameter가 너무 길 경우 (IAM Policy가 50줄이 넘거나 등등...) Slack으로 Notification이 전달되지 않을 수 있다. (400 Error)

 

이런 경우에는 Request Parameter를 지우고 다시 보낸다. (응답 코드가 200이 아닐 경우에는 다시 보낸다. 물론 다른 Issue로 인해 안보내지는 경우도 있겠다만 우선 내가 확인한 버그는 위와 같았다. 이슈는 CloudWatch Logs를 통해 확인할 수 있었고, 추가적으로 구글링을 하니 Block이 50줄 이상일 경우 400 Error를 발생한다고 하였음)

 

 

Full Code는 아래와 같다.

 

import json
import logging
import os
import urllib3

http = urllib3.PoolManager()

# Reference
# https://blog.nodeswat.com/simple-node-js-and-slack-webhook-integration-d87c95aa9600
   
# Get values from Environments variables
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']

NOTIFICATION_IAM = [
    "AttachGroupPolicy",
    "AttachRolePolicy",
    "AttachUserPolicy",
    "ChangePassword",
    "CreateAccessKey",
    "CreateGroup",
    "CreatePolicy",
    "CreateRole",
    "CreateUser",
    "DeleteAccessKey",
    "DeleteGroup",
    "DeleteGroupPolicy",
    "DeletePolicy",
    "DeleteRole",
    "DeleteRolePolicy",
    "DeleteUser",
    "DeleteUserPolicy",
    "DetachGroupPolicy",
    "DetachRolePolicy",
    "DetachUserPolicy",
    "PutGroupPolicy",
    "PutRolePolicy",
    "PutUserPolicy",
    "CreatePolicyVersion"
    ]

ACCOUNT_INFO = {
    "000000000000": "A",
    "111111111111": "B",
    "222222222222": "C",
    "333333333333": "D",
    "444444444444": "E",
    "555555555555": "F",
    "666666666666": "G"
}

   
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def send_message(message):
    
    data = json.dumps(message).encode('utf-8')
    
    res = http.request(
        method='POST',
        url=HOOK_URL,
        body=data,
        headers={"Content-Type": "application/json"}
    )

    print(res.data, res.status)
    
    return res.status

    
def check_event_notification(api_event):
    
    if api_event in NOTIFICATION_IAM:
        print(f"{api_event} Event Will be sent to Slack Notification Channel.")
        return True
        
    print(f"{api_event} Event does not need to send Slack Notification.")
    return False


def translate_account_id_to_name(id):
    account_name = f"{ACCOUNT_INFO.get(id, '')} ({id})"
    
    return account_name
    
   
def lambda_handler(event, context):
    
    iam_event = json.loads(event.get('Records')[0].get('Sns').get('Message'))
    
    logger.info("Event        : " + str(event))
    
    # 정보
    event_id = iam_event['detail']['eventID']
    
      
    # 주체
    # user_name = iam_event['detail']['userIdentity']['userName']

    # 이벤트 내역
    source_ip = iam_event['detail']['sourceIPAddress']  # AWS Internal, ..
    event_time = iam_event['detail']['eventTime']
    event_name = iam_event['detail']['eventName']
    aws_region = iam_event['detail']['awsRegion']
    event_request_parameters = iam_event['detail']['requestParameters']
    account_id = iam_event['account']
    
    translated_account_name = translate_account_id_to_name(account_id)
    
    changed_resource = "IAM"
    
    # UserIdentity
    user_identity = iam_event['detail']['userIdentity']
    try:
        user_type = user_identity['type']   # IAMUser, AssumeRole, ...
        user_arn = user_identity['arn']
        print(f"user_type : {user_type}, arn : {user_arn}")
    except Exception as e:
        print(f"{user_identity} is something wrong.")
    
    
    # cloudtrail url
    ct_url = f"https://{aws_region}.console.aws.amazon.com/cloudtrail/home?region={aws_region}#/events/{event_id}"
    
    print(ct_url)
    
    current_event = ""
    
    temp_count = 0
    for key, value in event_request_parameters.items():
        print(f"Current Key Count : {temp_count}, Key : {key}, Value : {value}")
        current_event += f"{key} : {value}\n"
        
        temp_count += 1
    
    
    logger.info("SLACK Channel: " + SLACK_CHANNEL)
    logger.info("HOOK URL     : " + HOOK_URL)
    
    check_list = ["Delete", "Detach", "Revoke"]
    if any(x in event_name for x in check_list):
        color = "#eb4034"
    else:
        color = "#0c3f7d"
    
    # color = "#eb4034" if event_name.find("delete") >= 0 else "#0c3f7d"
    
    if check_event_notification(event_name):
        slack_message = {
            "channel": SLACK_CHANNEL,
            "text": f"{changed_resource} 변경 사항이 감지되었습니다.",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": '*'+ changed_resource + ' 변경 사항이 감지되었습니다.*\n\n*Request Parameters*'
                    }
                },
                # {"type": "divider"},
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": current_event + '\n'
                    }
                },
                {"type": "divider"}
            ],
            "attachments": [{
                "fallback": "Fallback 입니다.",
                "color": color,
                "blocks": [
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": '*유저 arn*\n' + user_arn
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*유저 유형*\n' + user_type
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*소스 IP*\n' + source_ip + ' (<https://ko.infobyip.com/ip-' + source_ip + '.html|*IP 위치 조회*>)'
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*이벤트 시간 (UTC)*\n' + event_time
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*이벤트 이름*\n' + event_name
                        },
                        {
                            "type": "mrkdwn",
                            "text": '*Account 이름*\n' + translated_account_name
                        }
                    ]
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {
                                "type": "plain_text",
                                "text": "CloudTrail 확인 :waving_white_flag:"
                            },
                            "style": "primary",
                            "url": ct_url
                        }
                    ]
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": "CloudTrail 확인을 위해서 " + translated_account_name + " Account 로그인 세션이 있어야 하며, \nNotification 발생 후 2~3분 지난 뒤 조회가 가능합니다."
                    }
                },
                {
                "type": "divider"
                }
                ]
            }]
        }
        
        logger.info("Slack Message        : " + str(slack_message))
        
        response_result = send_message(slack_message)
        
        if response_result != 200:
            slack_message = {
                "channel": SLACK_CHANNEL,
                "text": f"{changed_resource} 변경 사항이 감지되었습니다.",
                "blocks": [
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": '*'+ changed_resource + ' 변경 사항이 감지되었습니다.*\n\n*Request Parameters*'
                        }
                    },
                    # {"type": "divider"},
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": "Parameters 길이가 길어 생략되었습니다." + '\n'
                        }
                    },
                    {"type": "divider"}
                ],
                "attachments": [{
                    "fallback": "Fallback 입니다.",
                    "color": color,
                    "blocks": [
                    {
                        "type": "section",
                        "fields": [
                            {
                                "type": "mrkdwn",
                                "text": '*유저 arn*\n' + user_arn
                            },
                            {
                                "type": "mrkdwn",
                                "text": '*유저 유형*\n' + user_type
                            },
                            {
                                "type": "mrkdwn",
                                "text": '*소스 IP*\n' + source_ip + ' (<https://ko.infobyip.com/ip-' + source_ip + '.html|*IP 위치 조회*>)'
                            },
                            {
                                "type": "mrkdwn",
                                "text": '*이벤트 시간 (UTC)*\n' + event_time
                            },
                            {
                                "type": "mrkdwn",
                                "text": '*이벤트 이름*\n' + event_name
                            },
                            {
                                "type": "mrkdwn",
                                "text": '*Account 이름*\n' + translated_account_name
                            }
                        ]
                    },
                    {
                        "type": "actions",
                        "elements": [
                            {
                                "type": "button",
                                "text": {
                                    "type": "plain_text",
                                    "text": "CloudTrail 확인 :waving_white_flag:"
                                },
                                "style": "primary",
                                "url": ct_url
                            }
                        ]
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": "CloudTrail 확인을 위해서 " + translated_account_name + " Account 로그인 세션이 있어야 하며, \nNotification 발생 후 2~3분 지난 뒤 조회가 가능합니다."
                        }
                    },
                    {
                    "type": "divider"
                    }
                    ]
                }]
            }
        
            response_result = send_message(slack_message)
            
            print(f"Final : {response_result}")

 

 

Slack으로 오는 알람은 아래와 같다.

 

그 다음에는 어떻게 CloudWatch Logs에서 OpenSearch로 보낼 수 있는지에 대해 확인해 볼 예정이다.

반응형