-
Notifications
You must be signed in to change notification settings - Fork 420
/
Copy pathcloud_watch_logs_event.py
106 lines (81 loc) · 3.42 KB
/
cloud_watch_logs_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import base64
import zlib
from typing import Dict, List, Optional
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper
class CloudWatchLogsLogEvent(EventWrapper):
@property
def get_id(self) -> str:
"""The ID property is a unique identifier for every log event."""
# Note: this name conflicts with existing python builtins
return self["id"]
@property
def timestamp(self) -> int:
"""Get the `timestamp` property"""
return self["timestamp"]
@property
def message(self) -> str:
"""Get the `message` property"""
return self["message"]
@property
def extracted_fields(self) -> Optional[Dict[str, str]]:
"""Get the `extractedFields` property"""
return self.get("extractedFields")
class CloudWatchLogsDecodedData(DictWrapper):
@property
def owner(self) -> str:
"""The AWS Account ID of the originating log data."""
return self["owner"]
@property
def log_group(self) -> str:
"""The log group name of the originating log data."""
return self["logGroup"]
@property
def log_stream(self) -> str:
"""The log stream name of the originating log data."""
return self["logStream"]
@property
def subscription_filters(self) -> List[str]:
"""The list of subscription filter names that matched with the originating log data."""
return self["subscriptionFilters"]
@property
def message_type(self) -> str:
"""Data messages will use the "DATA_MESSAGE" type.
Sometimes CloudWatch Logs may emit Kinesis records with a "CONTROL_MESSAGE" type,
mainly for checking if the destination is reachable.
"""
return self["messageType"]
@property
def policy_level(self) -> Optional[str]:
"""The level at which the policy was enforced."""
return self.get("policyLevel")
@property
def log_events(self) -> List[CloudWatchLogsLogEvent]:
"""The actual log data, represented as an array of log event records.
The ID property is a unique identifier for every log event.
"""
return [CloudWatchLogsLogEvent(i) for i in self["logEvents"]]
class CloudWatchLogsEvent(EventWrapper):
"""CloudWatch Logs log stream event
You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html
"""
_decompressed_logs_data = None
_json_logs_data = None
@property
def raw_logs_data(self) -> str:
"""The value of the `data` field is a Base64 encoded ZIP archive."""
return self["awslogs"]["data"]
@property
def decompress_logs_data(self) -> bytes:
"""Decode and decompress log data"""
if self._decompressed_logs_data is None:
payload = base64.b64decode(self.raw_logs_data)
self._decompressed_logs_data = zlib.decompress(payload, zlib.MAX_WBITS | 32)
return self._decompressed_logs_data
def parse_logs_data(self) -> CloudWatchLogsDecodedData:
"""Decode, decompress and parse json data as CloudWatchLogsDecodedData"""
if self._json_logs_data is None:
self._json_logs_data = self._json_deserializer(self.decompress_logs_data.decode("UTF-8"))
return CloudWatchLogsDecodedData(self._json_logs_data)