Skip to content

Commit 83a605c

Browse files
committed
update based on rishabh's comments
Signed-off-by: Michael Oviedo <[email protected]>
1 parent 96b4be7 commit 83a605c

File tree

1 file changed

+51
-55
lines changed

1 file changed

+51
-55
lines changed

Diff for: osbenchmark/worker_coordinator/worker_coordinator.py

+51-55
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ def __init__(self, client_id, request_metadata):
206206

207207
class SharedClientStateMessage:
208208
"""Message sent from the Worker to the FeedbackActor to share client state dictionaries"""
209-
def __init__(self, worker_id, dictionary):
209+
def __init__(self, worker_id, worker_clients_map):
210210
self.worker_id = worker_id
211-
self.worker_clients_map = dictionary
211+
self.worker_clients_map = worker_clients_map
212212

213213
class FeedbackState(Enum):
214214
"""Various states for the FeedbackActor"""
@@ -229,57 +229,73 @@ def __init__(self):
229229
self.shared_client_states = {}
230230
self.total_active_client_count = 0
231231
self.total_client_count = 0
232-
self.sleep_start_time = None
233-
self.last_error_time = None
234-
self.last_scaleup_time = None
232+
self.sleep_start_time = 0
233+
self.last_error_time = 0
234+
self.last_scaleup_time = 0
235235

236-
def handle_state(self):
237-
current_time = time.time()
236+
def receiveMsg_ClusterErrorMessage(self, msg, sender):
237+
self.last_error_time = time.perf_counter()
238+
self.logger.info("Feedback actor has received an error message.")
239+
if self.state == FeedbackState.SCALING_DOWN:
240+
self.logger.info("Already scaling down, ignoring error")
241+
return
242+
elif self.state == FeedbackState.SLEEP:
243+
self.logger.info("In sleep mode, ignoring error")
244+
return
245+
elif self.state == FeedbackState.NEUTRAL:
246+
# Add error message to queue
247+
try:
248+
self.messageQueue.put(msg)
249+
except queue.Full:
250+
self.logger.error("Queue is full - message dropped: %s", msg)
238251

252+
def receiveMsg_SharedClientStateMessage(self, msg, sender):
253+
try:
254+
self.shared_client_states[msg.worker_id] = msg.worker_clients_map
255+
self.total_client_count += len(msg.worker_clients_map)
256+
self.handle_state()
257+
except Exception as e:
258+
self.logger.error("Error processing client states: %s", e)
259+
260+
def receiveMsg_StartFeedbackActor(self, msg, sender):
261+
self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL))
262+
self.messageQueue = queue.Queue(maxsize=self.total_active_client_count)
263+
264+
def receiveMsg_WakeupMessage(self, msg, sender):
265+
# Upon waking up, check state
266+
self.handle_state()
267+
self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL))
268+
269+
def receiveUnrecognizedMessage(self, msg, sender):
270+
self.logger.info("Received unrecognized message: %s", msg)
271+
272+
def handle_state(self):
273+
current_time = time.perf_counter()
239274
if self.state == FeedbackState.SLEEP:
240275
# Check if we've slept for long enough to return to a normal state
241276
if current_time - self.sleep_start_time >= self.POST_SCALEDOWN_SECONDS:
242277
self.logger.info("Feedback Actor's sleep period complete, returning to NEUTRAL state")
243278
self.state = FeedbackState.NEUTRAL
244-
self.sleep_start_time = None
245-
return
246-
247-
if self.messageQueue.qsize() > 0:
279+
self.sleep_start_time = 0
280+
elif self.state == FeedbackState.SCALING_UP:
281+
self.logger.info("Scaling clients up...")
282+
self.scale_up()
283+
elif self.messageQueue.qsize() > 0:
248284
self.logger.info("Feedback Actor has received an error message, scaling down...")
249285
self.state = FeedbackState.SCALING_DOWN
250286
self.scale_down()
251287
self.logger.info("Clients scaled down. Number of active clients: %d", self.total_active_client_count)
252-
253-
if self.state == FeedbackState.NEUTRAL:
288+
elif self.state == FeedbackState.NEUTRAL:
254289
# Check if we've waited long enough since the last scaledown
255290
if current_time - self.last_error_time >= self.POST_SCALEDOWN_SECONDS:
256-
if (self.last_scaleup_time is None or
257-
current_time - self.last_scaleup_time >= self.WAKEUP_INTERVAL):
291+
if(current_time - self.last_scaleup_time >= self.WAKEUP_INTERVAL):
258292
self.logger.info("no errors in the last 30 seconds, scaling up")
259293
self.state = FeedbackState.SCALING_UP
260294
self.scale_up()
261-
self.last_scaleup_time = current_time
262295
else:
263296
self.logger.info("Cluster has errored too recently, waiting before scaling up")
264297

265-
def receiveMsg_ClusterErrorMessage(self, msg, sender):
266-
self.last_error_time = time.time()
267-
self.logger.info("Feedback actor has recevied an error message.")
268-
if self.state == FeedbackState.SCALING_DOWN:
269-
self.logger.info("Already scaling down, ignoring error")
270-
return
271-
elif self.state == FeedbackState.SLEEP:
272-
self.logger.info("In sleep mode, ignoring error")
273-
return
274-
elif self.state == FeedbackState.NEUTRAL:
275-
# Add error message to queue
276-
try:
277-
self.messageQueue.put(msg)
278-
except queue.Full:
279-
self.logger.error("Queue is full - message dropped: %s", msg)
280-
281298
def scale_down(self, scale_down_percentage=0.10):
282-
self.scaledown_timer = 0
283299
try:
284300
# calculate target number of clients to pause
285301
clients_to_pause = int(self.total_active_client_count * scale_down_percentage)
@@ -315,8 +331,7 @@ def scale_down(self, scale_down_percentage=0.10):
315331
# clear the message queue
316332
with self.messageQueue.mutex:
317333
self.messageQueue.clear()
318-
self.sleep_start_time = time.time()
319-
self.last_scaleup_time = None
334+
self.sleep_start_time = time.perf_counter()
320335

321336
def scale_up(self):
322337
try:
@@ -342,28 +357,9 @@ def scale_up(self):
342357
if not client_activated:
343358
self.logger.info("No inactive clients found to activate")
344359
finally:
360+
self.last_scaleup_time = time.perf_counter()
345361
self.state = FeedbackState.NEUTRAL
346362

347-
def receiveMsg_SharedClientStateMessage(self, msg, sender):
348-
try:
349-
self.shared_client_states[msg.worker_id] = msg.worker_clients_map
350-
self.total_client_count += len(msg.worker_clients_map)
351-
self.handle_state()
352-
except Exception as e:
353-
self.logger.error("Error processing client states: %s", e)
354-
355-
def receiveMsg_StartFeedbackActor(self, msg, sender):
356-
self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL))
357-
self.messageQueue = queue.Queue(maxsize=self.total_active_client_count)
358-
359-
def receiveMsg_WakeupMessage(self, msg, sender):
360-
# Upon waking up, check state
361-
self.handle_state()
362-
self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL))
363-
364-
def receiveUnrecognizedMessage(self, msg, sender):
365-
self.logger.info("Received unrecognized message: %s", msg)
366-
367363
class WorkerCoordinatorActor(actor.BenchmarkActor):
368364
RESET_RELATIVE_TIME_MARKER = "reset_relative_time"
369365

0 commit comments

Comments
 (0)