-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathtest_redis.py
More file actions
79 lines (71 loc) · 3.34 KB
/
test_redis.py
File metadata and controls
79 lines (71 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import unittest
from azure.functions.decorators.constants import REDIS_PUBSUB_TRIGGER, REDIS_LIST_TRIGGER, REDIS_STREAM_TRIGGER
from azure.functions.decorators.core import BindingDirection, DataType
from azure.functions.decorators.redis import RedisPubSubTrigger, RedisListTrigger, RedisStreamTrigger
class TestRedis(unittest.TestCase):
def test_pubsub_trigger_valid_creation(self):
trigger = RedisPubSubTrigger(name="req",
connectionStringSetting="dummy_connection",
channel="dummy_channel",
data_type=DataType.UNDEFINED,
dummy_field="dummy")
self.assertEqual(trigger.get_binding_name(), "redisPubSubTrigger")
self.assertEqual(trigger.get_dict_repr(), {
"type": REDIS_PUBSUB_TRIGGER,
"direction": BindingDirection.IN,
'dummyField': 'dummy',
"name": "req",
"dataType": DataType.UNDEFINED,
"connectionStringSetting": "dummy_connection",
"channel": "dummy_channel"
})
def test_list_trigger_valid_creation(self):
trigger = RedisListTrigger(name="req",
connectionStringSetting="dummy_connection",
key="dummy_key",
pollingIntervalInMs=1,
messagesPerWorker=2,
count=3,
listPopFromBeginning=False,
data_type=DataType.UNDEFINED,
dummy_field="dummy")
self.assertEqual(trigger.get_binding_name(), "redisListTrigger")
self.assertEqual(trigger.get_dict_repr(), {
"type": REDIS_LIST_TRIGGER,
"direction": BindingDirection.IN,
'dummyField': 'dummy',
"name": "req",
"dataType": DataType.UNDEFINED,
"connectionStringSetting": "dummy_connection",
"channel": "dummy_channel",
"pollingIntervalInMs": 1,
"messagesPerWorker": 2,
"count": 3,
"listPopFromBeginning": False,
})
def test_stream_trigger_valid_creation(self):
trigger = RedisStreamTrigger(name="req",
connectionStringSetting="dummy_connection",
key="dummy_key",
pollingIntervalInMs=1,
messagesPerWorker=2,
count=3,
deleteAfterProcess=True,
data_type=DataType.UNDEFINED,
dummy_field="dummy")
self.assertEqual(trigger.get_binding_name(), "redisStreamTrigger")
self.assertEqual(trigger.get_dict_repr(), {
"type": REDIS_STREAM_TRIGGER,
"direction": BindingDirection.IN,
'dummyField': 'dummy',
"name": "req",
"dataType": DataType.UNDEFINED,
"connectionStringSetting": "dummy_connection",
"channel": "dummy_channel",
"pollingIntervalInMs": 1,
"messagesPerWorker": 2,
"count": 3,
"deleteAfterProcess": True,
})