-
Notifications
You must be signed in to change notification settings - Fork 301
/
Copy pathutils.py
496 lines (405 loc) · 17.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
import inspect
import operator
import warnings
from collections import OrderedDict
import inflection
from django.conf import settings
from django.db.models import Manager
from django.db.models.fields.related_descriptors import (
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
)
from django.http import Http404
from django.utils import encoding
from django.utils.translation import gettext_lazy as _
from rest_framework import exceptions, relations
from rest_framework.exceptions import APIException
from .settings import json_api_settings
# Generic relation descriptor from django.contrib.contenttypes.
if "django.contrib.contenttypes" not in settings.INSTALLED_APPS: # pragma: no cover
# Target application does not use contenttypes. Importing would cause errors.
ReverseGenericManyToOneDescriptor = object()
else:
from django.contrib.contenttypes.fields import ReverseGenericManyToOneDescriptor
def get_resource_name(context, expand_polymorphic_types=False):
"""
Return the name of a resource.
"""
from rest_framework_json_api.serializers import PolymorphicModelSerializer
view = context.get("view")
# Sanity check to make sure we have a view.
if not view:
return None
# Check to see if there is a status code and return early
# with the resource_name value of `errors`.
try:
code = str(view.response.status_code)
except (AttributeError, ValueError):
pass
else:
if code.startswith("4") or code.startswith("5"):
return "errors"
try:
resource_name = getattr(view, "resource_name")
except AttributeError:
try:
if "kwargs" in context and "related_field" in context["kwargs"]:
serializer = view.get_related_serializer_class()
else:
serializer = view.get_serializer_class()
if expand_polymorphic_types and issubclass(
serializer, PolymorphicModelSerializer
):
return serializer.get_polymorphic_types()
else:
return get_resource_type_from_serializer(serializer)
except AttributeError:
try:
resource_name = get_resource_type_from_model(view.model)
except AttributeError:
resource_name = view.__class__.__name__
if not isinstance(resource_name, str):
# The resource name is not a string - return as is
return resource_name
# the name was calculated automatically from the view > pluralize and format
resource_name = format_resource_type(resource_name)
return resource_name
def get_serializer_fields(serializer):
fields = None
if hasattr(serializer, "child"):
fields = getattr(serializer.child, "fields")
meta = getattr(serializer.child, "Meta", None)
if hasattr(serializer, "fields"):
fields = getattr(serializer, "fields")
meta = getattr(serializer, "Meta", None)
if fields is not None:
meta_fields = getattr(meta, "meta_fields", {})
for field in meta_fields:
try:
fields.pop(field)
except KeyError:
pass
return fields
def format_field_names(obj, format_type=None):
"""
Takes a dict and returns it with formatted keys as set in `format_type`
or `JSON_API_FORMAT_FIELD_NAMES`
:format_type: Either 'dasherize', 'camelize', 'capitalize' or 'underscore'
"""
if format_type is None:
format_type = json_api_settings.FORMAT_FIELD_NAMES
if isinstance(obj, dict):
formatted = OrderedDict()
for key, value in obj.items():
key = format_value(key, format_type)
formatted[key] = value
return formatted
return obj
def undo_format_field_names(obj):
"""
Takes a dict and undo format field names to underscore which is the Python convention
but only in case `JSON_API_FORMAT_FIELD_NAMES` is actually configured.
"""
if json_api_settings.FORMAT_FIELD_NAMES:
return format_field_names(obj, "underscore")
return obj
def format_field_name(field_name):
"""
Takes a field name and returns it with formatted keys as set in
`JSON_API_FORMAT_FIELD_NAMES`
"""
return format_value(field_name, json_api_settings.FORMAT_FIELD_NAMES)
def undo_format_field_name(field_name):
"""
Takes a string and undos format field name to underscore which is the Python convention
but only in case `JSON_API_FORMAT_FIELD_NAMES` is actually configured.
"""
if json_api_settings.FORMAT_FIELD_NAMES:
return format_value(field_name, "underscore")
return field_name
def format_link_segment(value, format_type=None):
"""
Takes a string value and returns it with formatted keys as set in `format_type`
or `JSON_API_FORMAT_RELATED_LINKS`.
:format_type: Either 'dasherize', 'camelize', 'capitalize' or 'underscore'
"""
if format_type is None:
format_type = json_api_settings.FORMAT_RELATED_LINKS
else:
warnings.warn(
DeprecationWarning(
"Using `format_type` argument is deprecated."
"Use `format_value` instead."
)
)
return format_value(value, format_type)
def undo_format_link_segment(value):
"""
Takes a link segment and undos format link segment to underscore which is the Python convention
but only in case `JSON_API_FORMAT_RELATED_LINKS` is actually configured.
"""
if json_api_settings.FORMAT_RELATED_LINKS:
return format_value(value, "underscore")
return value
def format_value(value, format_type=None):
if format_type is None:
warnings.warn(
DeprecationWarning(
"Using `format_value` without passing on `format_type` argument is deprecated."
"Use `format_field_name` instead."
)
)
format_type = json_api_settings.FORMAT_FIELD_NAMES
if format_type == "dasherize":
# inflection can't dasherize camelCase
value = inflection.underscore(value)
value = inflection.dasherize(value)
elif format_type == "camelize":
value = inflection.camelize(value, False)
elif format_type == "capitalize":
value = inflection.camelize(value)
elif format_type == "underscore":
value = inflection.underscore(value)
return value
def format_resource_type(value, format_type=None, pluralize=None):
if format_type is None:
format_type = json_api_settings.FORMAT_TYPES
if pluralize is None:
pluralize = json_api_settings.PLURALIZE_TYPES
if format_type:
value = format_value(value, format_type)
return inflection.pluralize(value) if pluralize else value
def get_related_resource_type(relation):
from rest_framework_json_api.serializers import PolymorphicModelSerializer
try:
return get_resource_type_from_serializer(relation)
except AttributeError:
pass
relation_model = None
if hasattr(relation, "_meta"):
relation_model = relation._meta.model
elif hasattr(relation, "model"):
# the model type was explicitly passed as a kwarg to ResourceRelatedField
relation_model = relation.model
elif hasattr(relation, "get_queryset") and relation.get_queryset() is not None:
relation_model = relation.get_queryset().model
elif (
getattr(relation, "many", False)
and hasattr(relation.child, "Meta")
and hasattr(relation.child.Meta, "model")
):
# For ManyToMany relationships, get the model from the child
# serializer of the list serializer
relation_model = relation.child.Meta.model
else:
parent_serializer = relation.parent
parent_model = None
if isinstance(parent_serializer, PolymorphicModelSerializer):
parent_model = parent_serializer.get_polymorphic_serializer_for_instance(
parent_serializer.instance
).Meta.model
elif hasattr(parent_serializer, "Meta"):
parent_model = getattr(parent_serializer.Meta, "model", None)
elif hasattr(parent_serializer, "parent") and hasattr(
parent_serializer.parent, "Meta"
):
parent_model = getattr(parent_serializer.parent.Meta, "model", None)
if parent_model is not None:
if relation.source:
if relation.source != "*":
parent_model_relation = getattr(parent_model, relation.source)
else:
parent_model_relation = getattr(parent_model, relation.field_name)
else:
parent_model_relation = getattr(
parent_model, parent_serializer.field_name
)
parent_model_relation_type = type(parent_model_relation)
if parent_model_relation_type is ReverseManyToOneDescriptor:
relation_model = parent_model_relation.rel.related_model
elif parent_model_relation_type is ManyToManyDescriptor:
relation_model = parent_model_relation.field.remote_field.model
# In case we are in a reverse relation
if relation_model == parent_model:
relation_model = parent_model_relation.field.model
elif parent_model_relation_type is ReverseGenericManyToOneDescriptor:
relation_model = parent_model_relation.rel.model
elif hasattr(parent_model_relation, "field"):
try:
relation_model = parent_model_relation.field.remote_field.model
except AttributeError:
relation_model = parent_model_relation.field.related.model
else:
return get_related_resource_type(parent_model_relation)
if relation_model is None:
# For ManyRelatedFields on plain Serializers the resource_type
# cannot be determined from a model, so we must get it from the
# child_relation
if hasattr(relation, "child_relation"):
return get_related_resource_type(relation.child_relation)
raise APIException(
_("Could not resolve resource type for relation %s" % relation)
)
return get_resource_type_from_model(relation_model)
def get_resource_type_from_model(model):
json_api_meta = getattr(model, "JSONAPIMeta", None)
return getattr(json_api_meta, "resource_name", format_resource_type(model.__name__))
def get_resource_type_from_queryset(qs):
return get_resource_type_from_model(qs.model)
def get_resource_type_from_instance(instance):
if hasattr(instance, "_meta"):
return get_resource_type_from_model(instance._meta.model)
def get_resource_type_from_manager(manager):
return get_resource_type_from_model(manager.model)
def get_resource_type_from_serializer(serializer):
json_api_meta = getattr(serializer, "JSONAPIMeta", None)
meta = getattr(serializer, "Meta", None)
if hasattr(json_api_meta, "resource_name"):
return json_api_meta.resource_name
elif hasattr(meta, "resource_name"):
return meta.resource_name
elif hasattr(meta, "model"):
return get_resource_type_from_model(meta.model)
raise AttributeError(
f"can not detect 'resource_name' on serializer '{serializer.__class__.__name__}'"
f" in module '{serializer.__class__.__module__}'"
)
def get_included_resources(request, serializer=None):
"""Build a list of included resources."""
include_resources_param = request.query_params.get("include") if request else None
if include_resources_param:
return include_resources_param.split(",")
else:
return get_default_included_resources_from_serializer(serializer)
def get_default_included_resources_from_serializer(serializer):
meta = getattr(serializer, "JSONAPIMeta", None)
if meta is None and getattr(serializer, "many", False):
meta = getattr(serializer.child, "JSONAPIMeta", None)
return list(getattr(meta, "included_resources", []))
def get_included_serializers(serializer):
warnings.warn(
DeprecationWarning(
"Using of `get_included_serializers(serializer)` function is deprecated."
"Use `serializer.included_serializers` instead."
)
)
return getattr(serializer, "included_serializers", dict())
def get_relation_instance(resource_instance, source, serializer):
try:
relation_instance = operator.attrgetter(source)(resource_instance)
except AttributeError:
# if the field is not defined on the model then we check the serializer
# and if no value is there we skip over the field completely
serializer_method = getattr(serializer, source, None)
if serializer_method and hasattr(serializer_method, "__call__"):
relation_instance = serializer_method(resource_instance)
else:
return False, None
if isinstance(relation_instance, Manager):
relation_instance = relation_instance.all()
return True, relation_instance
def is_relationship_field(field):
return isinstance(field, (relations.RelatedField, relations.ManyRelatedField))
class Hyperlink(str):
"""
A string like object that additionally has an associated name.
We use this for hyperlinked URLs that may render as a named link
in some contexts, or render as a plain URL in others.
Comes from Django REST framework 3.2
https://github.com/tomchristie/django-rest-framework
"""
def __new__(self, url, name):
ret = str.__new__(self, url)
ret.name = name
return ret
is_hyperlink = True
def format_drf_errors(response, context, exc):
errors = []
# handle generic errors. ValidationError('test') in a view for example
if isinstance(response.data, list):
for message in response.data:
errors.extend(format_error_object(message, "/data", response))
# handle all errors thrown from serializers
else:
# Avoid circular deps
from rest_framework import generics
has_serializer = isinstance(context["view"], generics.GenericAPIView)
if has_serializer:
serializer = context["view"].get_serializer()
fields = get_serializer_fields(serializer) or dict()
relationship_fields = [
name for name, field in fields.items() if is_relationship_field(field)
]
for field, error in response.data.items():
field = format_field_name(field)
pointer = None
# pointer can be determined only if there's a serializer.
if has_serializer:
rel = "relationships" if field in relationship_fields else "attributes"
pointer = "/data/{}/{}".format(rel, field)
if isinstance(exc, Http404) and isinstance(error, str):
# 404 errors don't have a pointer
errors.extend(format_error_object(error, None, response))
elif isinstance(error, str):
classes = inspect.getmembers(exceptions, inspect.isclass)
# DRF sets the `field` to 'detail' for its own exceptions
if isinstance(exc, tuple(x[1] for x in classes)):
pointer = "/data"
errors.extend(format_error_object(error, pointer, response))
elif isinstance(error, list):
errors.extend(format_error_object(error, pointer, response))
else:
errors.extend(format_error_object(error, pointer, response))
context["view"].resource_name = "errors"
response.data = errors
return response
def format_error_object(message, pointer, response):
errors = []
if isinstance(message, dict):
# as there is no required field in error object we check that all fields are string
# except links and source which might be a dict
is_custom_error = all(
[
isinstance(value, str)
for key, value in message.items()
if key not in ["links", "source"]
]
)
if is_custom_error:
if "source" not in message:
message["source"] = {}
message["source"] = {
"pointer": pointer,
}
errors.append(message)
else:
for k, v in message.items():
errors.extend(
format_error_object(v, pointer + "/{}".format(k), response)
)
elif isinstance(message, list):
for num, error in enumerate(message):
if isinstance(error, (list, dict)):
new_pointer = pointer + "/{}".format(num)
else:
new_pointer = pointer
if error:
errors.extend(format_error_object(error, new_pointer, response))
else:
error_obj = {
"detail": message,
"status": encoding.force_str(response.status_code),
}
if pointer is not None:
error_obj["source"] = {
"pointer": pointer,
}
code = getattr(message, "code", None)
if code is not None:
error_obj["code"] = code
errors.append(error_obj)
return errors
def format_errors(data):
if len(data) > 1 and isinstance(data, list):
data.sort(key=lambda x: x.get("source", {}).get("pointer", ""))
return {"errors": data}