Skip to content

Commit 12aa2b2

Browse files
committed
Last part of refactoring and working on performance of the server.
1 parent 3f2b5f2 commit 12aa2b2

1 file changed

Lines changed: 97 additions & 105 deletions

File tree

server.py

Lines changed: 97 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,8 @@ async def new_session(self) -> Optional[int]:
139139
"round_robin_index": 0,
140140
"enqueue_seq": 0,
141141
"count_ack": 0,
142-
"count_fin": 0,
143-
"count_syn_ack": 0,
144142
"count_data": 0,
145143
"count_resend": 0,
146-
"count_syn": 0,
147-
"count_ping": 0,
148144
"track_ack": set(),
149145
"track_resend": set(),
150146
"track_types": set(),
@@ -214,28 +210,6 @@ def _touch_session(self, session_id: int) -> None:
214210
except Exception:
215211
pass
216212

217-
async def close_inactive_sessions(self, timeout: int = 300) -> None:
218-
now = time.monotonic()
219-
while self._session_expiry_heap and self._session_expiry_heap[0][0] <= now:
220-
try:
221-
expiry, session_id = heapq.heappop(self._session_expiry_heap)
222-
session = self.sessions.get(session_id)
223-
if not session:
224-
continue
225-
if now - session.get("last_packet_time", 0) > timeout:
226-
try:
227-
await self._close_session(session_id)
228-
self.logger.info(
229-
f"<yellow>Closed inactive session with ID: <cyan>{session_id}</cyan></yellow>"
230-
)
231-
except Exception as e:
232-
self.logger.debug(
233-
f"<red>Error closing session <cyan>{session_id}</cyan>: {e}</red>"
234-
)
235-
continue
236-
except Exception:
237-
break
238-
239213
async def _handle_session_init(
240214
self,
241215
data=None,
@@ -275,12 +249,12 @@ async def _handle_session_init(
275249
return response_packet
276250

277251
async def _session_cleanup_loop(self) -> None:
278-
"""Background task to periodically cleanup inactive sessions."""
279-
try:
280-
cleanup_interval = float(self.session_cleanup_interval)
281-
timeout_limit = self.session_timeout
252+
"""Background task to periodically cleanup inactive sessions (Crash-Proof)."""
253+
cleanup_interval = float(self.session_cleanup_interval)
254+
timeout_limit = self.session_timeout
282255

283-
while not self.should_stop.is_set():
256+
while not self.should_stop.is_set():
257+
try:
284258
await asyncio.sleep(cleanup_interval)
285259
now = time.monotonic()
286260

@@ -300,9 +274,11 @@ async def _session_cleanup_loop(self) -> None:
300274
self.logger.debug(
301275
f"<red>Error closing session <cyan>{sid}</cyan>: {e}</red>"
302276
)
303-
304-
except asyncio.CancelledError:
305-
pass
277+
except asyncio.CancelledError:
278+
break
279+
except Exception as e:
280+
self.logger.error(f"Unexpected error in session cleanup loop: {e}")
281+
await asyncio.sleep(1)
306282

307283
# ---------------------------------------------------------
308284
# Network I/O & Packet Processing
@@ -386,7 +362,6 @@ async def handle_vpn_packet(
386362
)
387363

388364
now_mono = time.monotonic()
389-
390365
self._touch_session(session_id)
391366

392367
stream_id = extracted_header.get("stream_id", 0) if extracted_header else 0
@@ -437,7 +412,14 @@ async def handle_vpn_packet(
437412
res_sn = 0
438413
res_ptype = Packet_Type.PONG
439414

440-
active_streams = [sid for sid, sdata in streams.items() if sdata["tx_queue"]]
415+
target_queue = None
416+
is_main = False
417+
selected_stream_data = None
418+
419+
main_queue = session.get("main_queue")
420+
active_streams = [
421+
sid for sid, sdata in streams.items() if sdata.get("tx_queue")
422+
]
441423

442424
if active_streams:
443425
num_active = len(active_streams)
@@ -446,48 +428,24 @@ async def handle_vpn_packet(
446428
rr_index = 0
447429

448430
selected_sid = active_streams[rr_index]
449-
stream_data = streams[selected_sid]
450-
target_queue = stream_data["tx_queue"]
431+
selected_stream_data = streams[selected_sid]
432+
t_queue = selected_stream_data["tx_queue"]
451433

452-
session["round_robin_index"] = (rr_index + 1) % num_active
434+
if main_queue and main_queue[0][0] < t_queue[0][0]:
435+
target_queue = main_queue
436+
is_main = True
437+
else:
438+
target_queue = t_queue
439+
session["round_robin_index"] = (rr_index + 1) % num_active
440+
elif main_queue:
441+
target_queue = main_queue
442+
is_main = True
453443

444+
if target_queue:
454445
item = heapq.heappop(target_queue)
455-
q_ptype, q_stream_id, q_sn = item[3], item[4], item[5]
456-
457-
if q_ptype == Packet_Type.STREAM_DATA:
458-
stream_data["track_data"].discard(q_sn)
459-
if stream_data["count_data"] > 0:
460-
stream_data["count_data"] -= 1
461-
elif q_ptype == Packet_Type.STREAM_DATA_ACK:
462-
stream_data["track_ack"].discard(q_sn)
463-
if stream_data["count_ack"] > 0:
464-
stream_data["count_ack"] -= 1
465-
elif q_ptype == Packet_Type.STREAM_RESEND:
466-
stream_data["track_resend"].discard(q_sn)
467-
if stream_data["count_resend"] > 0:
468-
stream_data["count_resend"] -= 1
469-
elif q_ptype == Packet_Type.STREAM_FIN:
470-
stream_data["track_fin"].discard(q_ptype)
471-
if stream_data["count_fin"] > 0:
472-
stream_data["count_fin"] -= 1
473-
elif q_ptype == Packet_Type.STREAM_SYN_ACK:
474-
stream_data["track_syn_ack"].discard(q_ptype)
475-
if stream_data["count_syn_ack"] > 0:
476-
stream_data["count_syn_ack"] -= 1
477-
478-
res_ptype, res_stream_id, res_sn, res_data = (
479-
q_ptype,
480-
q_stream_id,
481-
q_sn,
482-
item[6],
483-
)
484-
485-
if res_ptype == Packet_Type.PONG:
486-
main_queue = session.get("main_queue")
487-
if main_queue:
488-
item = heapq.heappop(main_queue)
489-
q_ptype, q_stream_id, q_sn = item[3], item[4], item[5]
446+
q_ptype, q_stream_id, q_sn = item[2], item[3], item[4]
490447

448+
if is_main:
491449
if q_ptype == Packet_Type.STREAM_DATA:
492450
session["track_data"].discard(q_sn)
493451
if session["count_data"] > 0:
@@ -506,13 +464,34 @@ async def handle_vpn_packet(
506464
Packet_Type.STREAM_SYN_ACK,
507465
):
508466
session["track_types"].discard(q_ptype)
467+
else:
468+
if q_ptype == Packet_Type.STREAM_DATA:
469+
selected_stream_data["track_data"].discard(q_sn)
470+
if selected_stream_data["count_data"] > 0:
471+
selected_stream_data["count_data"] -= 1
472+
elif q_ptype == Packet_Type.STREAM_DATA_ACK:
473+
selected_stream_data["track_ack"].discard(q_sn)
474+
if selected_stream_data["count_ack"] > 0:
475+
selected_stream_data["count_ack"] -= 1
476+
elif q_ptype == Packet_Type.STREAM_RESEND:
477+
selected_stream_data["track_resend"].discard(q_sn)
478+
if selected_stream_data["count_resend"] > 0:
479+
selected_stream_data["count_resend"] -= 1
480+
elif q_ptype == Packet_Type.STREAM_FIN:
481+
selected_stream_data["track_fin"].discard(q_ptype)
482+
if selected_stream_data["count_fin"] > 0:
483+
selected_stream_data["count_fin"] -= 1
484+
elif q_ptype == Packet_Type.STREAM_SYN_ACK:
485+
selected_stream_data["track_syn_ack"].discard(q_ptype)
486+
if selected_stream_data["count_syn_ack"] > 0:
487+
selected_stream_data["count_syn_ack"] -= 1
509488

510-
res_ptype, res_stream_id, res_sn, res_data = (
511-
q_ptype,
512-
q_stream_id,
513-
q_sn,
514-
item[6],
515-
)
489+
res_ptype, res_stream_id, res_sn, res_data = (
490+
q_ptype,
491+
q_stream_id,
492+
q_sn,
493+
item[5],
494+
)
516495

517496
if res_ptype == Packet_Type.PONG:
518497
res_data = b"PO:" + os.urandom(4)
@@ -771,7 +750,7 @@ async def _handle_mtu_up(
771750
async def close_stream(
772751
self, session_id: int, stream_id: int, reason: str = "Unknown"
773752
) -> None:
774-
"""Safely and fully close a specific stream and free resources."""
753+
"""Safely and fully close a specific stream, salvage pending FIN/ACKs, and free resources."""
775754
session = self.sessions.get(session_id)
776755
if not session:
777756
return
@@ -799,11 +778,15 @@ async def close_stream(
799778
)
800779

801780
try:
781+
for item in stream_data["tx_queue"]:
782+
heapq.heappush(session["main_queue"], item)
783+
802784
stream_data["tx_queue"].clear()
803785
stream_data["status"] = "CLOSED"
804786
except Exception:
805787
pass
806788

789+
# ۳. پاک کردن امن استریم
807790
session_streams.pop(stream_id, None)
808791

809792
async def _server_enqueue_tx(
@@ -838,10 +821,9 @@ async def _server_enqueue_tx(
838821
ptype = Packet_Type.STREAM_RESEND
839822
eff_priority = 1
840823

841-
now = time.time()
842824
session["enqueue_seq"] = (session.get("enqueue_seq", 0) + 1) & 0x7FFFFFFF
843825
seq = session["enqueue_seq"]
844-
queue_item = (eff_priority, seq, now, ptype, stream_id, sn, data)
826+
queue_item = (eff_priority, seq, ptype, stream_id, sn, data)
845827

846828
if stream_id == 0:
847829
if is_resend:
@@ -935,7 +917,6 @@ async def _handle_stream_syn(self, session_id, stream_id):
935917
"status": "PENDING",
936918
"arq_obj": None,
937919
"tx_queue": [], # heapq
938-
"total_packets": 0,
939920
"count_ack": 0,
940921
"count_fin": 0,
941922
"count_syn_ack": 0,
@@ -988,32 +969,43 @@ async def _handle_stream_syn(self, session_id, stream_id):
988969
)
989970

990971
async def _server_retransmit_loop(self):
991-
"""Background task to handle ARQ retransmissions for all active streams."""
972+
"""Background task to handle ARQ retransmissions for all active streams (Crash-Proof)."""
992973
while not self.should_stop.is_set():
993-
await asyncio.sleep(0.5)
994-
for session_id, session in list(self.sessions.items()):
995-
streams = session.get("streams", {})
996-
if not streams:
997-
continue
998-
999-
closed_ids = []
1000-
for sid, stream_data in streams.items():
1001-
arq_obj = stream_data.get("arq_obj")
1002-
if arq_obj and getattr(arq_obj, "closed", False):
1003-
closed_ids.append(sid)
974+
try:
975+
await asyncio.sleep(0.5)
976+
for session_id, session in list(self.sessions.items()):
977+
streams = session.get("streams", {})
978+
if not streams:
979+
continue
1004980

1005-
for sid in closed_ids:
1006-
await self.close_stream(
1007-
session_id, sid, reason="Marked Closed by ARQStream"
1008-
)
981+
closed_ids = []
982+
for sid, stream_data in streams.items():
983+
arq_obj = stream_data.get("arq_obj")
984+
if arq_obj and getattr(arq_obj, "closed", False):
985+
closed_ids.append(sid)
1009986

1010-
for sid, stream_data in list(streams.items()):
1011-
arq_obj = stream_data.get("arq_obj")
1012-
if arq_obj:
987+
for sid in closed_ids:
1013988
try:
1014-
await arq_obj.check_retransmits()
989+
await self.close_stream(
990+
session_id, sid, reason="Marked Closed by ARQStream"
991+
)
1015992
except Exception as e:
1016-
self.logger.error(f"Error in retransmit sid {sid}: {e}")
993+
self.logger.debug(
994+
f"Error closing stream {sid} during retransmit check: {e}"
995+
)
996+
997+
for sid, stream_data in list(streams.items()):
998+
arq_obj = stream_data.get("arq_obj")
999+
if arq_obj:
1000+
try:
1001+
await arq_obj.check_retransmits()
1002+
except Exception as e:
1003+
self.logger.error(f"Error in retransmit sid {sid}: {e}")
1004+
except asyncio.CancelledError:
1005+
break
1006+
except Exception as e:
1007+
self.logger.error(f"Unexpected error in retransmit loop: {e}")
1008+
await asyncio.sleep(0.5)
10171009

10181010
# ---------------------------------------------------------
10191011
# App Lifecycle

0 commit comments

Comments
 (0)