|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | | -import functools |
| 15 | +import enum |
16 | 16 | import importlib.util |
| 17 | +import io |
| 18 | +import json |
17 | 19 | import os.path |
18 | 20 | import pathlib |
19 | 21 | import sys |
20 | 22 | import types |
21 | 23 |
|
| 24 | +import cloudevents.sdk |
| 25 | +import cloudevents.sdk.event |
| 26 | +import cloudevents.sdk.event.v1 |
| 27 | +import cloudevents.sdk.marshaller |
22 | 28 | import flask |
23 | 29 | import werkzeug |
24 | 30 |
|
|
35 | 41 | DEFAULT_SIGNATURE_TYPE = "http" |
36 | 42 |
|
37 | 43 |
|
| 44 | +class _EventType(enum.Enum): |
| 45 | + LEGACY = 1 |
| 46 | + CLOUDEVENT_BINARY = 2 |
| 47 | + CLOUDEVENT_STRUCTURED = 3 |
| 48 | + |
| 49 | + |
38 | 50 | class _Event(object): |
39 | 51 | """Event passed to background functions.""" |
40 | 52 |
|
@@ -67,38 +79,83 @@ def view_func(path): |
67 | 79 | return view_func |
68 | 80 |
|
69 | 81 |
|
70 | | -def _is_binary_cloud_event(request): |
71 | | - return ( |
| 82 | +def _get_cloudevent_version(): |
| 83 | + return cloudevents.sdk.event.v1.Event() |
| 84 | + |
| 85 | + |
| 86 | +def _run_legacy_event(function, request): |
| 87 | + event_data = request.get_json() |
| 88 | + if not event_data: |
| 89 | + flask.abort(400) |
| 90 | + event_object = _Event(**event_data) |
| 91 | + data = event_object.data |
| 92 | + context = Context(**event_object.context) |
| 93 | + function(data, context) |
| 94 | + |
| 95 | + |
| 96 | +def _run_binary_cloudevent(function, request, cloudevent_def): |
| 97 | + data = io.BytesIO(request.get_data()) |
| 98 | + http_marshaller = cloudevents.sdk.marshaller.NewDefaultHTTPMarshaller() |
| 99 | + event = http_marshaller.FromRequest( |
| 100 | + cloudevent_def, request.headers, data, json.load |
| 101 | + ) |
| 102 | + |
| 103 | + function(event) |
| 104 | + |
| 105 | + |
| 106 | +def _run_structured_cloudevent(function, request, cloudevent_def): |
| 107 | + data = io.StringIO(request.get_data(as_text=True)) |
| 108 | + m = cloudevents.sdk.marshaller.NewDefaultHTTPMarshaller() |
| 109 | + event = m.FromRequest(cloudevent_def, request.headers, data, json.loads) |
| 110 | + function(event) |
| 111 | + |
| 112 | + |
| 113 | +def _get_event_type(request): |
| 114 | + if ( |
72 | 115 | request.headers.get("ce-type") |
73 | 116 | and request.headers.get("ce-specversion") |
74 | 117 | and request.headers.get("ce-source") |
75 | 118 | and request.headers.get("ce-id") |
76 | | - ) |
| 119 | + ): |
| 120 | + return _EventType.CLOUDEVENT_BINARY |
| 121 | + elif request.headers.get("Content-Type") == "application/cloudevents+json": |
| 122 | + return _EventType.CLOUDEVENT_STRUCTURED |
| 123 | + else: |
| 124 | + return _EventType.LEGACY |
77 | 125 |
|
78 | 126 |
|
79 | 127 | def _event_view_func_wrapper(function, request): |
80 | 128 | def view_func(path): |
81 | | - if _is_binary_cloud_event(request): |
82 | | - # Support CloudEvents in binary content mode, with data being the |
83 | | - # whole request body and context attributes retrieved from request |
84 | | - # headers. |
85 | | - data = request.get_data() |
86 | | - context = Context( |
87 | | - eventId=request.headers.get("ce-eventId"), |
88 | | - timestamp=request.headers.get("ce-timestamp"), |
89 | | - eventType=request.headers.get("ce-eventType"), |
90 | | - resource=request.headers.get("ce-resource"), |
| 129 | + if _get_event_type(request) == _EventType.LEGACY: |
| 130 | + _run_legacy_event(function, request) |
| 131 | + else: |
| 132 | + # here for defensive backwards compatibility in case we make a mistake in rollout. |
| 133 | + flask.abort( |
| 134 | + 400, |
| 135 | + description="The FUNCTION_SIGNATURE_TYPE for this function is set to event " |
| 136 | + "but no Google Cloud Functions Event was given. If you are using CloudEvents set " |
| 137 | + "FUNCTION_SIGNATURE_TYPE=cloudevent", |
91 | 138 | ) |
92 | | - function(data, context) |
| 139 | + |
| 140 | + return "OK" |
| 141 | + |
| 142 | + return view_func |
| 143 | + |
| 144 | + |
| 145 | +def _cloudevent_view_func_wrapper(function, request): |
| 146 | + def view_func(path): |
| 147 | + cloudevent_def = _get_cloudevent_version() |
| 148 | + event_type = _get_event_type(request) |
| 149 | + if event_type == _EventType.CLOUDEVENT_STRUCTURED: |
| 150 | + _run_structured_cloudevent(function, request, cloudevent_def) |
| 151 | + elif event_type == _EventType.CLOUDEVENT_BINARY: |
| 152 | + _run_binary_cloudevent(function, request, cloudevent_def) |
93 | 153 | else: |
94 | | - # This is a regular CloudEvent |
95 | | - event_data = request.get_json() |
96 | | - if not event_data: |
97 | | - flask.abort(400) |
98 | | - event_object = _Event(**event_data) |
99 | | - data = event_object.data |
100 | | - context = Context(**event_object.context) |
101 | | - function(data, context) |
| 154 | + flask.abort( |
| 155 | + 400, |
| 156 | + description="Function was defined with FUNCTION_SIGNATURE_TYPE=cloudevent " |
| 157 | + " but it did not receive a cloudevent as a request.", |
| 158 | + ) |
102 | 159 |
|
103 | 160 | return "OK" |
104 | 161 |
|
@@ -179,19 +236,27 @@ def create_app(target=None, source=None, signature_type=None): |
179 | 236 | app.url_map.add(werkzeug.routing.Rule("/<path:path>", endpoint="run")) |
180 | 237 | app.view_functions["run"] = _http_view_func_wrapper(function, flask.request) |
181 | 238 | app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") |
182 | | - elif signature_type == "event": |
| 239 | + elif signature_type == "event" or signature_type == "cloudevent": |
183 | 240 | app.url_map.add( |
184 | 241 | werkzeug.routing.Rule( |
185 | | - "/", defaults={"path": ""}, endpoint="run", methods=["POST"] |
| 242 | + "/", defaults={"path": ""}, endpoint=signature_type, methods=["POST"] |
186 | 243 | ) |
187 | 244 | ) |
188 | 245 | app.url_map.add( |
189 | | - werkzeug.routing.Rule("/<path:path>", endpoint="run", methods=["POST"]) |
| 246 | + werkzeug.routing.Rule( |
| 247 | + "/<path:path>", endpoint=signature_type, methods=["POST"] |
| 248 | + ) |
190 | 249 | ) |
191 | | - app.view_functions["run"] = _event_view_func_wrapper(function, flask.request) |
| 250 | + |
192 | 251 | # Add a dummy endpoint for GET / |
193 | 252 | app.url_map.add(werkzeug.routing.Rule("/", endpoint="get", methods=["GET"])) |
194 | 253 | app.view_functions["get"] = lambda: "" |
| 254 | + |
| 255 | + # Add the view functions |
| 256 | + app.view_functions["event"] = _event_view_func_wrapper(function, flask.request) |
| 257 | + app.view_functions["cloudevent"] = _cloudevent_view_func_wrapper( |
| 258 | + function, flask.request |
| 259 | + ) |
195 | 260 | else: |
196 | 261 | raise FunctionsFrameworkException( |
197 | 262 | "Invalid signature type: {signature_type}".format( |
|
0 commit comments