13
13
14
14
class Command :
15
15
args = None
16
- client_id = 0
16
+ sync_id = 0 # Commands with the same sync_id will be executed synchrnously
17
17
18
18
class TwitterCacheTraceParser :
19
19
"""
@@ -29,7 +29,7 @@ def parse(self, csv) -> Command:
29
29
ttl = csv [6 ]
30
30
31
31
cmd = Command ()
32
- cmd .client_id = client_id
32
+ cmd .sync_id = client_id
33
33
34
34
if operation == "get" :
35
35
cmd .args = ["GET" , key ]
@@ -58,29 +58,20 @@ def parse(self, csv) -> Command:
58
58
59
59
class AsyncWorker :
60
60
QUEUE_SIZE = 100000
61
- BATCH_SIZE = 20
62
61
63
62
def __init__ (self , redis_client ) -> None :
64
63
self .queue = asyncio .Queue (self .QUEUE_SIZE )
65
64
self .redis_client = redis_client
66
65
self .working = False
67
66
68
- async def put (self , cmd : Command ) -> None :
69
- await self .queue .put (cmd )
67
+ async def put (self , batch : list ) -> None :
68
+ await self .queue .put (batch )
70
69
71
70
async def work (self ) -> None :
72
71
self .working = True
73
- batch = []
74
- while self .working or len (batch ) > 0 or not self .queue .empty () :
75
- try :
76
- cmd = await asyncio .wait_for (self .queue .get (), timeout = 1.0 )
77
- batch .append (cmd )
78
- if len (batch ) >= self .BATCH_SIZE :
79
- await self .execute (batch )
80
- batch .clear ()
81
- except asyncio .exceptions .TimeoutError :
82
- await self .execute (batch )
83
- batch .clear ()
72
+ while self .working or not self .queue .empty () :
73
+ batch = await self .queue .get ()
74
+ await self .execute (batch )
84
75
85
76
async def execute (self , batch ) -> None :
86
77
async with self .redis_client .pipeline (transaction = False ) as pipe :
@@ -97,32 +88,32 @@ def stop(self) -> None:
97
88
class AsyncWorkerPool :
98
89
"""
99
90
Mangaes worker pool to send commands in parallel
100
- Maintains synchronous order for commands with the same client id
91
+ Maintains synchronous order for commands with the same sync_id
101
92
"""
102
93
def __init__ (self , redis_client , num_workers ) -> None :
103
94
self .redis_client = redis_client
104
95
self .num_workers = num_workers
105
96
self .workers = []
106
97
self .tasks = []
107
- self .client_id_to_worker = {}
98
+ self .sync_id_to_worker = {}
108
99
self .next_worker_index = - 1
109
100
110
- def allocate (self , client_id ) -> AsyncWorker :
111
- if not client_id in self .client_id_to_worker :
101
+ def allocate (self , sync_id ) -> AsyncWorker :
102
+ if not sync_id in self .sync_id_to_worker :
112
103
self .next_worker_index = (self .next_worker_index + 1 ) % self .num_workers
113
104
114
105
if len (self .workers ) <= self .next_worker_index :
115
106
assert len (self .workers ) == self .next_worker_index
116
107
self .workers .append (AsyncWorker (self .redis_client ))
117
108
self .tasks .append (self .workers [self .next_worker_index ].start ())
118
109
119
- self .client_id_to_worker [ client_id ] = self .workers [self .next_worker_index ]
110
+ self .sync_id_to_worker [ sync_id ] = self .workers [self .next_worker_index ]
120
111
121
- return self .client_id_to_worker [ client_id ]
112
+ return self .sync_id_to_worker [ sync_id ]
122
113
123
- async def put (self , cmd : Command ) -> None :
124
- worker = self .allocate (cmd . client_id )
125
- await worker .put (cmd )
114
+ async def put (self , batch : list , sync_id : int ) -> None :
115
+ worker = self .allocate (sync_id )
116
+ await worker .put (batch )
126
117
127
118
async def stop (self ):
128
119
for worker in self .workers :
@@ -131,18 +122,40 @@ async def stop(self):
131
122
132
123
133
124
class AsyncPlayer :
125
+ READ_BATCH_SIZE = 10 * 1000 * 1000
126
+
134
127
def __init__ (self , redis_uri , num_workers ) -> None :
135
128
self .redis_uri = redis_uri
136
129
self .redis_client = aioredis .from_url (f"redis://{ self .redis_uri } " , encoding = "utf-8" , decode_responses = True )
137
130
self .worker_pool = AsyncWorkerPool (self .redis_client , 100 )
138
131
132
+ self .batch_by_sync_id = {}
133
+
134
+ async def dispatch_batches (self ):
135
+ for sync_id in self .batch_by_sync_id :
136
+ await self .worker_pool .put (self .batch_by_sync_id [sync_id ], sync_id )
137
+ self .batch_by_sync_id .clear ()
138
+
139
139
async def read_and_dispatch (self , csv_file , parser ):
140
140
print (f"dispatching from { csv_file } " )
141
+
142
+ line_count = 0
143
+
141
144
async with aiofiles .open (csv_file , mode = "r" , encoding = "utf-8" , newline = "" ) as afp :
142
145
async for row in AsyncReader (afp ):
143
- await self .worker_pool .put (parser .parse (row ))
144
-
145
- async def reportStats (self ):
146
+ cmd = parser .parse (row )
147
+ if not self .batch_by_sync_id .get (cmd .sync_id ):
148
+ self .batch_by_sync_id [cmd .sync_id ] = []
149
+ batch = self .batch_by_sync_id [cmd .sync_id ]
150
+ batch .append (cmd )
151
+ line_count = line_count + 1
152
+ if (line_count >= self .READ_BATCH_SIZE ):
153
+ await self .dispatch_batches ()
154
+ line_count = 0
155
+ # handle the remaining lines
156
+ await self .dispatch_batches ()
157
+
158
+ async def report_stats (self ):
146
159
while True :
147
160
info = await self .redis_client .execute_command ("info" , "stats" )
148
161
print (f"{ datetime .now ()} : { info } " )
@@ -154,13 +167,12 @@ async def play(self, csv_file, parser) -> None:
154
167
print (await self .redis_client .ping ())
155
168
156
169
read_dispatch_task = asyncio .create_task (self .read_and_dispatch (csv_file , parser ))
157
- stats_task = asyncio .create_task (self .reportStats ())
170
+ stats_task = asyncio .create_task (self .report_stats ())
158
171
159
172
await read_dispatch_task
160
173
print (f"finished reading { csv_file } " )
161
174
162
175
await self .worker_pool .stop ()
163
-
164
176
stats_task .cancel ()
165
177
print ("all done" )
166
178
0 commit comments