Skip to content

Commit 705a703

Browse files
cumason123Curtis Mason
authored and
Curtis Mason
committed
Created CloudEvent class (cloudevents#36)
CloudEvents is a more pythonic interface for using cloud events. It is powered by internal marshallers and cloud event base classes. It performs basic validation on fields, and cloud event type checking. Signed-off-by: Curtis Mason <[email protected]> Signed-off-by: Dustin Ingram <[email protected]> Signed-off-by: Curtis Mason <[email protected]>
1 parent 922290c commit 705a703

File tree

7 files changed

+379
-4
lines changed

7 files changed

+379
-4
lines changed

cloudevents/sdk/event/base.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,21 @@
1616
import json
1717
import typing
1818

19+
_ce_required_fields = {
20+
'id',
21+
'source',
22+
'type',
23+
'specversion'
24+
}
25+
26+
27+
_ce_optional_fields = {
28+
'datacontenttype',
29+
'schema',
30+
'subject',
31+
'time'
32+
}
33+
1934

2035
# TODO(slinkydeveloper) is this really needed?
2136
class EventGetterSetter(object):
@@ -117,6 +132,7 @@ def MarshalJSON(self, data_marshaller: typing.Callable) -> typing.IO:
117132

118133
def UnmarshalJSON(self, b: typing.IO, data_unmarshaller: typing.Callable):
119134
raw_ce = json.load(b)
135+
120136
for name, value in raw_ce.items():
121137
if name == "data":
122138
value = data_unmarshaller(value)
@@ -134,7 +150,6 @@ def UnmarshalBinary(
134150
self.SetContentType(value)
135151
elif header.startswith("ce-"):
136152
self.Set(header[3:], value)
137-
138153
self.Set("data", data_unmarshaller(body))
139154

140155
def MarshalBinary(

cloudevents/sdk/http_events.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
import copy
15+
16+
import json
17+
import typing
18+
19+
from cloudevents.sdk import marshaller
20+
21+
from cloudevents.sdk.event import base
22+
from cloudevents.sdk.event import v03, v1
23+
24+
25+
class CloudEvent(base.BaseEvent):
26+
"""
27+
Python-friendly cloudevent class supporting v1 events
28+
Currently only supports binary content mode CloudEvents
29+
"""
30+
31+
def __init__(
32+
self,
33+
headers: dict,
34+
data: dict,
35+
data_unmarshaller: typing.Callable = lambda x: x
36+
):
37+
"""
38+
Event HTTP Constructor
39+
:param headers: a dict with HTTP headers
40+
e.g. {
41+
"content-type": "application/cloudevents+json",
42+
"ce-id": "16fb5f0b-211e-1102-3dfe-ea6e2806f124",
43+
"ce-source": "<event-source>",
44+
"ce-type": "cloudevent.event.type",
45+
"ce-specversion": "0.2"
46+
}
47+
:type headers: dict
48+
:param data: a dict to be stored inside Event
49+
:type data: dict
50+
:param binary: a bool indicating binary events
51+
:type binary: bool
52+
:param data_unmarshaller: callable function for reading/extracting data
53+
:type data_unmarshaller: typing.Callable
54+
"""
55+
headers = {key.lower(): value for key, value in headers.items()}
56+
data = {key.lower(): value for key, value in data.items()}
57+
event_version = CloudEvent.detect_event_version(headers, data)
58+
if CloudEvent.is_binary_cloud_event(headers):
59+
60+
# Headers validation for binary events
61+
for field in base._ce_required_fields:
62+
ce_prefixed_field = f"ce-{field}"
63+
64+
# Verify field exists else throw TypeError
65+
if ce_prefixed_field not in headers:
66+
raise TypeError(
67+
"parameter headers has no required attribute {0}"
68+
.format(
69+
ce_prefixed_field
70+
))
71+
72+
if not isinstance(headers[ce_prefixed_field], str):
73+
raise TypeError(
74+
"in parameter headers attribute "
75+
"{0} expected type str but found type {1}".format(
76+
ce_prefixed_field, type(headers[ce_prefixed_field])
77+
))
78+
79+
for field in base._ce_optional_fields:
80+
ce_prefixed_field = f"ce-{field}"
81+
if ce_prefixed_field in headers and not \
82+
isinstance(headers[ce_prefixed_field], str):
83+
raise TypeError(
84+
"in parameter headers attribute "
85+
"{0} expected type str but found type {1}".format(
86+
ce_prefixed_field, type(headers[ce_prefixed_field])
87+
))
88+
89+
else:
90+
# TODO: Support structured CloudEvents
91+
raise NotImplementedError
92+
93+
self.headers = copy.deepcopy(headers)
94+
self.data = copy.deepcopy(data)
95+
self.marshall = marshaller.NewDefaultHTTPMarshaller()
96+
self.event_handler = event_version()
97+
self.marshall.FromRequest(
98+
self.event_handler,
99+
self.headers,
100+
self.data,
101+
data_unmarshaller
102+
)
103+
104+
@staticmethod
105+
def is_binary_cloud_event(headers):
106+
for field in base._ce_required_fields:
107+
if f"ce-{field}" not in headers:
108+
return False
109+
return True
110+
111+
@staticmethod
112+
def detect_event_version(headers, data):
113+
"""
114+
Returns event handler depending on specversion within
115+
headers for binary cloudevents or within data for structured
116+
cloud events
117+
"""
118+
specversion = headers.get('ce-specversion', data.get('specversion'))
119+
if specversion == '1.0':
120+
return v1.Event
121+
elif specversion == '0.3':
122+
return v03.Event
123+
else:
124+
raise TypeError(f"specversion {specversion} "
125+
"currently unsupported")
126+
127+
def __repr__(self):
128+
return json.dumps(
129+
{
130+
'Event': {
131+
'headers': self.headers,
132+
'data': self.data
133+
}
134+
},
135+
indent=4
136+
)

cloudevents/tests/data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
headers = {
2525
v03.Event: {
26-
"ce-specversion": "0.3",
26+
"ce-specversion": "1.0",
2727
"ce-type": ce_type,
2828
"ce-id": ce_id,
2929
"ce-time": eventTime,
@@ -42,7 +42,7 @@
4242

4343
json_ce = {
4444
v03.Event: {
45-
"specversion": "0.3",
45+
"specversion": "1.0",
4646
"type": ce_type,
4747
"id": ce_id,
4848
"time": eventTime,

cloudevents/tests/test_http_events.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
import json
15+
16+
import copy
17+
18+
from cloudevents.sdk.http_events import CloudEvent
19+
20+
from sanic import response
21+
from sanic import Sanic
22+
23+
import pytest
24+
25+
26+
invalid_test_headers = [
27+
{
28+
"ce-source": "<event-source>",
29+
"ce-type": "cloudevent.event.type",
30+
"ce-specversion": "1.0"
31+
}, {
32+
"ce-id": "my-id",
33+
"ce-type": "cloudevent.event.type",
34+
"ce-specversion": "1.0"
35+
}, {
36+
"ce-id": "my-id",
37+
"ce-source": "<event-source>",
38+
"ce-specversion": "1.0"
39+
}, {
40+
"ce-id": "my-id",
41+
"ce-source": "<event-source>",
42+
"ce-type": "cloudevent.event.type",
43+
}
44+
]
45+
46+
test_data = {
47+
"payload-content": "Hello World!"
48+
}
49+
50+
app = Sanic(__name__)
51+
52+
53+
def post(url, headers, json):
54+
return app.test_client.post(url, headers=headers, data=json)
55+
56+
57+
@app.route("/event", ["POST"])
58+
async def echo(request):
59+
assert isinstance(request.json, dict)
60+
event = CloudEvent(dict(request.headers), request.json)
61+
return response.text(json.dumps(event.data), headers=event.headers)
62+
63+
64+
@pytest.mark.parametrize("headers", invalid_test_headers)
65+
def test_invalid_binary_headers(headers):
66+
with pytest.raises((TypeError, NotImplementedError)):
67+
# CloudEvent constructor throws TypeError if missing required field
68+
# and NotImplementedError because structured calls aren't
69+
# implemented. In this instance one of the required keys should have
70+
# prefix e-id instead of ce-id therefore it should throw
71+
_ = CloudEvent(headers, test_data)
72+
73+
74+
@pytest.mark.parametrize("specversion", ['1.0', '0.3'])
75+
def test_emit_binary_event(specversion):
76+
headers = {
77+
"ce-id": "my-id",
78+
"ce-source": "<event-source>",
79+
"ce-type": "cloudevent.event.type",
80+
"ce-specversion": specversion,
81+
"Content-Type": "application/json"
82+
}
83+
event = CloudEvent(headers, test_data)
84+
_, r = app.test_client.post(
85+
"/event",
86+
headers=event.headers,
87+
data=json.dumps(event.data)
88+
)
89+
90+
# Convert byte array to dict
91+
# e.g. r.body = b'{"payload-content": "Hello World!"}'
92+
body = json.loads(r.body.decode('utf-8'))
93+
94+
# Check response fields
95+
for key in test_data:
96+
assert body[key] == test_data[key]
97+
for key in headers:
98+
assert r.headers[key] == headers[key]
99+
assert r.status_code == 200
100+
101+
102+
@pytest.mark.parametrize("specversion", ['1.0', '0.3'])
103+
def test_missing_ce_prefix_binary_event(specversion):
104+
headers = {
105+
"ce-id": "my-id",
106+
"ce-source": "<event-source>",
107+
"ce-type": "cloudevent.event.type",
108+
"ce-specversion": specversion
109+
}
110+
for key in headers:
111+
val = headers.pop(key)
112+
113+
# breaking prefix e.g. e-id instead of ce-id
114+
headers[key[1:]] = val
115+
with pytest.raises((TypeError, NotImplementedError)):
116+
# CloudEvent constructor throws TypeError if missing required field
117+
# and NotImplementedError because structured calls aren't
118+
# implemented. In this instance one of the required keys should have
119+
# prefix e-id instead of ce-id therefore it should throw
120+
_ = CloudEvent(headers, test_data)
121+
122+
123+
@pytest.mark.parametrize("specversion", ['1.0', '0.3'])
124+
def test_valid_cloud_events(specversion):
125+
# Test creating multiple cloud events
126+
events_queue = []
127+
headers = {}
128+
num_cloudevents = 30
129+
for i in range(num_cloudevents):
130+
headers = {
131+
"ce-id": f"id{i}",
132+
"ce-source": f"source{i}.com.test",
133+
"ce-type": f"cloudevent.test.type",
134+
"ce-specversion": specversion
135+
}
136+
data = {'payload': f"payload-{i}"}
137+
events_queue.append(CloudEvent(headers, data))
138+
139+
for i, event in enumerate(events_queue):
140+
headers = event.headers
141+
data = event.data
142+
143+
assert headers['ce-id'] == f"id{i}"
144+
assert headers['ce-source'] == f"source{i}.com.test"
145+
assert headers['ce-specversion'] == specversion
146+
assert data['payload'] == f"payload-{i}"

requirements/test.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ pytest==4.0.0
77
pytest-cov==2.4.0
88
# web app tests
99
sanic
10-
aiohttp
10+
aiohttp
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
import sys
15+
import io
16+
from cloudevents.sdk.http_events import CloudEvent
17+
18+
import requests
19+
20+
if __name__ == "__main__":
21+
# expects a url from command line. e.g.
22+
# python3 sample-server.py http://localhost:3000/event
23+
if len(sys.argv) < 2:
24+
sys.exit("Usage: python with_requests.py "
25+
"<CloudEvents controller URL>")
26+
27+
url = sys.argv[1]
28+
29+
# CloudEvent headers and data
30+
headers = {
31+
"ce-id": "my-id",
32+
"ce-source": "<event-source>",
33+
"ce-type": "cloudevent.event.type",
34+
"ce-specversion": "1.0"
35+
}
36+
data = {"payload-content": "Hello World!"}
37+
38+
# Create a CloudEvent
39+
event = CloudEvent(headers=headers, data=data)
40+
41+
# Print the created CloudEvent then send it to some url we got from
42+
# command line
43+
print(f"Sent {event}")
44+
requests.post(url, headers=event.headers, json=event.data)

0 commit comments

Comments
 (0)