-
Notifications
You must be signed in to change notification settings - Fork 1.2k
fix(queuefs): dedupe memory semantic parent enqueues (#769) #792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """SemanticQueue: Semantic extraction queue.""" | ||
| """Semantic extraction queue.""" | ||
|
|
||
| import threading | ||
| import time | ||
| from typing import Optional | ||
|
|
||
| from openviking_cli.utils.logger import get_logger | ||
|
|
@@ -11,12 +13,44 @@ | |
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
| # Coalesce rapid re-enqueues for the same memory parent directory (github #769). | ||
| _MEMORY_PARENT_SEMANTIC_DEDUPE_SEC = 45.0 | ||
|
|
||
|
|
||
| class SemanticQueue(NamedQueue): | ||
| """Semantic extraction queue for async generation of .abstract.md and .overview.md.""" | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self._memory_parent_semantic_last: dict[str, float] = {} | ||
| self._memory_parent_semantic_lock = threading.Lock() | ||
|
|
||
| @staticmethod | ||
| def _memory_parent_semantic_key(msg: SemanticMsg) -> str: | ||
| return f"{msg.account_id}|{msg.user_id}|{msg.agent_id}|{msg.uri}" | ||
|
|
||
| async def enqueue(self, msg: SemanticMsg) -> str: | ||
| """Serialize SemanticMsg object and store in queue.""" | ||
| if msg.context_type == "memory": | ||
| key = self._memory_parent_semantic_key(msg) | ||
| now = time.monotonic() | ||
| with self._memory_parent_semantic_lock: | ||
| last = self._memory_parent_semantic_last.get(key, 0.0) | ||
| if now - last < _MEMORY_PARENT_SEMANTIC_DEDUPE_SEC: | ||
| logger.debug( | ||
| "[SemanticQueue] Skipping duplicate memory semantic enqueue for %s " | ||
| "(within %.0fs dedupe window; see #769)", | ||
| msg.uri, | ||
| _MEMORY_PARENT_SEMANTIC_DEDUPE_SEC, | ||
| ) | ||
| return "deduplicated" | ||
| self._memory_parent_semantic_last[key] = now | ||
| if len(self._memory_parent_semantic_last) > 2000: | ||
| cutoff = now - (_MEMORY_PARENT_SEMANTIC_DEDUPE_SEC * 4) | ||
| stale = [k for k, t in self._memory_parent_semantic_last.items() if t < cutoff] | ||
| for k in stale[:800]: | ||
| self._memory_parent_semantic_last.pop(k, None) | ||
|
Comment on lines
+48
to
+52
|
||
|
|
||
| return await super().enqueue(msg.to_dict()) | ||
|
|
||
| async def dequeue(self) -> Optional[SemanticMsg]: | ||
|
|
@@ -39,7 +73,7 @@ async def dequeue(self) -> Optional[SemanticMsg]: | |
| return None | ||
|
|
||
| async def peek(self) -> Optional[SemanticMsg]: | ||
| """Peek at queue head message.""" | ||
| """Peek at message from queue.""" | ||
| data_dict = await super().peek() | ||
| if not data_dict: | ||
| return None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| """Tests for memory-context semantic enqueue deduplication (#769).""" | ||
|
|
||
| from unittest.mock import AsyncMock, MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from openviking.storage.queuefs.named_queue import NamedQueue | ||
| from openviking.storage.queuefs.semantic_msg import SemanticMsg | ||
| from openviking.storage.queuefs.semantic_queue import SemanticQueue | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_memory_semantic_enqueue_deduped_within_window(): | ||
| mock_agfs = MagicMock() | ||
| with patch.object(NamedQueue, "enqueue", new_callable=AsyncMock) as named_enqueue: | ||
| named_enqueue.return_value = "queued-id" | ||
| q = SemanticQueue(mock_agfs, "/queue", "semantic") | ||
| msg = SemanticMsg( | ||
| uri="viking://user/default/memories/entities", | ||
| context_type="memory", | ||
| account_id="acc", | ||
| user_id="u1", | ||
| agent_id="a1", | ||
| ) | ||
| r1 = await q.enqueue(msg) | ||
| r2 = await q.enqueue( | ||
| SemanticMsg( | ||
| uri="viking://user/default/memories/entities", | ||
| context_type="memory", | ||
| account_id="acc", | ||
| user_id="u1", | ||
| agent_id="a1", | ||
| ) | ||
| ) | ||
| assert r1 == "queued-id" | ||
| assert r2 == "deduplicated" | ||
| assert named_enqueue.call_count == 1 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_memory_semantic_enqueue_different_uri_not_deduped(): | ||
| mock_agfs = MagicMock() | ||
| with patch.object(NamedQueue, "enqueue", new_callable=AsyncMock) as named_enqueue: | ||
| named_enqueue.return_value = "queued-id" | ||
| q = SemanticQueue(mock_agfs, "/queue", "semantic") | ||
| await q.enqueue( | ||
| SemanticMsg( | ||
| uri="viking://user/default/memories/entities", | ||
| context_type="memory", | ||
| ) | ||
| ) | ||
| await q.enqueue( | ||
| SemanticMsg( | ||
| uri="viking://user/default/memories/patterns", | ||
| context_type="memory", | ||
| ) | ||
| ) | ||
| assert named_enqueue.call_count == 2 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_non_memory_context_not_deduped(): | ||
| mock_agfs = MagicMock() | ||
| with patch.object(NamedQueue, "enqueue", new_callable=AsyncMock) as named_enqueue: | ||
| named_enqueue.return_value = "queued-id" | ||
| q = SemanticQueue(mock_agfs, "/queue", "semantic") | ||
| uri = "viking://resources/docs" | ||
| await q.enqueue(SemanticMsg(uri=uri, context_type="resource")) | ||
| await q.enqueue(SemanticMsg(uri=uri, context_type="resource")) | ||
| assert named_enqueue.call_count == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enqueue()is async but uses athreading.Lock()critical section. Under contention this can block the event loop (and it currently includes alogger.debug(...)call inside the lock). Consider using anasyncio.Lockif this is only accessed from the event loop, or otherwise minimizing the locked region (compute the decision under lock, then log/return after releasing).