Skip to content

Commit bf41b2e

Browse files
Bugfix/set default headers for properties in pika (open-telemetry#740)
* Bugfix the porperties.headers in the pika instrumentation, and write tests that ensure it * Ensure that BasicProperties is called with headers={} * Update CHANGELOG.md * Update the span kind from SERVER/CLIENT to PRODUCER/CONSUMER Co-authored-by: Diego Hurtado <[email protected]>
1 parent 4e3aaa5 commit bf41b2e

File tree

3 files changed

+142
-11
lines changed

3 files changed

+142
-11
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
- `opentelemetry-util-http` no longer contains an instrumentation entrypoint and will not be loaded
1212
automatically by the auto instrumentor.
1313
([#745](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/745))
14+
- `opentelemetry-instrumentation-pika` Bugfix use properties.headers. It will prevent the header injection from raising.
15+
([#740](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/740))
1416

1517
## [1.6.0-0.25b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.0-0.25b0) - 2021-10-13
1618
### Added

instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
MessagingOperationValues,
1111
SpanAttributes,
1212
)
13-
from opentelemetry.trace import Tracer
13+
from opentelemetry.trace import SpanKind, Tracer
1414
from opentelemetry.trace.span import Span
1515

1616

@@ -40,16 +40,15 @@ def decorated_callback(
4040
body: bytes,
4141
) -> Any:
4242
if not properties:
43-
properties = BasicProperties()
44-
if properties.headers is None:
45-
properties.headers = {}
43+
properties = BasicProperties(headers={})
4644
ctx = propagate.extract(properties.headers, getter=_pika_getter)
4745
if not ctx:
4846
ctx = context.get_current()
4947
span = _get_span(
5048
tracer,
5149
channel,
5250
properties,
51+
span_kind=SpanKind.CONSUMER,
5352
task_name=task_name,
5453
ctx=ctx,
5554
operation=MessagingOperationValues.RECEIVE,
@@ -74,12 +73,13 @@ def decorated_function(
7473
mandatory: bool = False,
7574
) -> Any:
7675
if not properties:
77-
properties = BasicProperties()
76+
properties = BasicProperties(headers={})
7877
ctx = context.get_current()
7978
span = _get_span(
8079
tracer,
8180
channel,
8281
properties,
82+
span_kind=SpanKind.PRODUCER,
8383
task_name="(temporary)",
8484
ctx=ctx,
8585
operation=None,
@@ -104,6 +104,7 @@ def _get_span(
104104
channel: Channel,
105105
properties: BasicProperties,
106106
task_name: str,
107+
span_kind: SpanKind,
107108
ctx: context.Context,
108109
operation: Optional[MessagingOperationValues] = None,
109110
) -> Optional[Span]:
@@ -113,7 +114,9 @@ def _get_span(
113114
return None
114115
task_name = properties.type if properties.type else task_name
115116
span = tracer.start_span(
116-
context=ctx, name=_generate_span_name(task_name, operation)
117+
context=ctx,
118+
name=_generate_span_name(task_name, operation),
119+
kind=span_kind,
117120
)
118121
if span.is_recording():
119122
_enrich_span(span, channel, properties, task_name, operation)

instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py

+131-5
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313
# limitations under the License.
1414
from unittest import TestCase, mock
1515

16+
from pika.channel import Channel
17+
from pika.spec import Basic, BasicProperties
18+
1619
from opentelemetry.instrumentation.pika import utils
17-
from opentelemetry.semconv.trace import SpanAttributes
18-
from opentelemetry.trace import Span, Tracer
20+
from opentelemetry.semconv.trace import (
21+
MessagingOperationValues,
22+
SpanAttributes,
23+
)
24+
from opentelemetry.trace import Span, SpanKind, Tracer
1925

2026

2127
class TestUtils(TestCase):
@@ -32,12 +38,15 @@ def test_get_span(
3238
channel = mock.MagicMock()
3339
properties = mock.MagicMock()
3440
task_name = "test.test"
41+
span_kind = mock.MagicMock(spec=SpanKind)
3542
get_value.return_value = None
3643
ctx = mock.MagicMock()
37-
_ = utils._get_span(tracer, channel, properties, task_name, ctx)
44+
_ = utils._get_span(
45+
tracer, channel, properties, task_name, span_kind, ctx
46+
)
3847
generate_span_name.assert_called_once()
3948
tracer.start_span.assert_called_once_with(
40-
context=ctx, name=generate_span_name.return_value
49+
context=ctx, name=generate_span_name.return_value, kind=span_kind
4150
)
4251
enrich_span.assert_called_once()
4352

@@ -54,9 +63,12 @@ def test_get_span_suppressed(
5463
channel = mock.MagicMock()
5564
properties = mock.MagicMock()
5665
task_name = "test.test"
66+
span_kind = mock.MagicMock(spec=SpanKind)
5767
get_value.return_value = True
5868
ctx = mock.MagicMock()
59-
span = utils._get_span(tracer, channel, properties, task_name, ctx)
69+
span = utils._get_span(
70+
tracer, channel, properties, task_name, span_kind, ctx
71+
)
6072
self.assertEqual(span, None)
6173
generate_span_name.assert_not_called()
6274
enrich_span.assert_not_called()
@@ -158,3 +170,117 @@ def test_enrich_span_unique_connection() -> None:
158170
),
159171
],
160172
)
173+
174+
@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
175+
@mock.patch("opentelemetry.propagate.extract")
176+
@mock.patch("opentelemetry.trace.use_span")
177+
def test_decorate_callback(
178+
self,
179+
use_span: mock.MagicMock,
180+
extract: mock.MagicMock,
181+
get_span: mock.MagicMock,
182+
) -> None:
183+
callback = mock.MagicMock()
184+
mock_task_name = "mock_task_name"
185+
tracer = mock.MagicMock()
186+
channel = mock.MagicMock(spec=Channel)
187+
method = mock.MagicMock(spec=Basic.Deliver)
188+
properties = mock.MagicMock()
189+
mock_body = b"mock_body"
190+
decorated_callback = utils._decorate_callback(
191+
callback, tracer, mock_task_name
192+
)
193+
retval = decorated_callback(channel, method, properties, mock_body)
194+
extract.assert_called_once_with(
195+
properties.headers, getter=utils._pika_getter
196+
)
197+
get_span.assert_called_once_with(
198+
tracer,
199+
channel,
200+
properties,
201+
span_kind=SpanKind.CONSUMER,
202+
task_name=mock_task_name,
203+
ctx=extract.return_value,
204+
operation=MessagingOperationValues.RECEIVE,
205+
)
206+
use_span.assert_called_once_with(
207+
get_span.return_value, end_on_exit=True
208+
)
209+
callback.assert_called_once_with(
210+
channel, method, properties, mock_body
211+
)
212+
self.assertEqual(retval, callback.return_value)
213+
214+
@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
215+
@mock.patch("opentelemetry.propagate.inject")
216+
@mock.patch("opentelemetry.context.get_current")
217+
@mock.patch("opentelemetry.trace.use_span")
218+
def test_decorate_basic_publish(
219+
self,
220+
use_span: mock.MagicMock,
221+
get_current: mock.MagicMock,
222+
inject: mock.MagicMock,
223+
get_span: mock.MagicMock,
224+
) -> None:
225+
callback = mock.MagicMock()
226+
tracer = mock.MagicMock()
227+
channel = mock.MagicMock(spec=Channel)
228+
method = mock.MagicMock(spec=Basic.Deliver)
229+
properties = mock.MagicMock()
230+
mock_body = b"mock_body"
231+
decorated_basic_publish = utils._decorate_basic_publish(
232+
callback, channel, tracer
233+
)
234+
retval = decorated_basic_publish(
235+
channel, method, mock_body, properties
236+
)
237+
get_current.assert_called_once()
238+
get_span.assert_called_once_with(
239+
tracer,
240+
channel,
241+
properties,
242+
span_kind=SpanKind.PRODUCER,
243+
task_name="(temporary)",
244+
ctx=get_current.return_value,
245+
operation=None,
246+
)
247+
use_span.assert_called_once_with(
248+
get_span.return_value, end_on_exit=True
249+
)
250+
get_span.return_value.is_recording.assert_called_once()
251+
inject.assert_called_once_with(properties.headers)
252+
callback.assert_called_once_with(
253+
channel, method, mock_body, properties, False
254+
)
255+
self.assertEqual(retval, callback.return_value)
256+
257+
@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
258+
@mock.patch("opentelemetry.propagate.inject")
259+
@mock.patch("opentelemetry.context.get_current")
260+
@mock.patch("opentelemetry.trace.use_span")
261+
@mock.patch("pika.spec.BasicProperties.__new__")
262+
def test_decorate_basic_publish_no_properties(
263+
self,
264+
basic_properties: mock.MagicMock,
265+
use_span: mock.MagicMock,
266+
get_current: mock.MagicMock,
267+
inject: mock.MagicMock,
268+
get_span: mock.MagicMock,
269+
) -> None:
270+
callback = mock.MagicMock()
271+
tracer = mock.MagicMock()
272+
channel = mock.MagicMock(spec=Channel)
273+
method = mock.MagicMock(spec=Basic.Deliver)
274+
mock_body = b"mock_body"
275+
decorated_basic_publish = utils._decorate_basic_publish(
276+
callback, channel, tracer
277+
)
278+
retval = decorated_basic_publish(channel, method, body=mock_body)
279+
basic_properties.assert_called_once_with(BasicProperties, headers={})
280+
get_current.assert_called_once()
281+
use_span.assert_called_once_with(
282+
get_span.return_value, end_on_exit=True
283+
)
284+
get_span.return_value.is_recording.assert_called_once()
285+
inject.assert_called_once_with(basic_properties.return_value.headers)
286+
self.assertEqual(retval, callback.return_value)

0 commit comments

Comments
 (0)