Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update req id match 65 #302

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,8 +1442,11 @@ def __init__(self, group_key: str):
self.app_span_roots: List[SpanNode] = None
# 用于存放 `app_span` 的所有 leaf
self.app_span_leafs: List[SpanNode] = None
# 记录叶子节点的 syscall_trace_id, 用以匹配 s-p root
self.leaf_syscall_trace_id_request: Set[int] = set()
self.leaf_syscall_trace_id_response: Set[int] = set()
# 记录叶子节点的 x_request_id => index (in self.spans), 用以匹配 s-p root
self.leaf_x_request_id: Dict[str, List[int]] = {}
# 用于显示调用拓扑使用
self.subnet_id = None
self.subnet = None
Expand Down Expand Up @@ -1552,10 +1555,23 @@ def append_sys_span(self, sys_span: SysSpanNode):
self._set_extra_value_for_sys_span(sys_span)
self._set_auto_service(sys_span)
if sys_span.tap_side == TAP_SIDE_CLIENT_PROCESS:
self.leaf_syscall_trace_id_request.add(
sys_span.get_syscall_trace_id_request())
self.leaf_syscall_trace_id_response.add(
sys_span.get_syscall_trace_id_response())
cp_syscall_trace_id_req = sys_span.get_syscall_trace_id_request()
cp_syscall_trace_id_res = sys_span.get_syscall_trace_id_response()
cp_x_request_id_0 = sys_span.get_x_request_id_0()
cp_x_request_id_1 = sys_span.get_x_request_id_1()
if cp_syscall_trace_id_req:
self.leaf_syscall_trace_id_request.add(
sys_span.get_syscall_trace_id_request())
if cp_syscall_trace_id_res:
self.leaf_syscall_trace_id_response.add(
sys_span.get_syscall_trace_id_response())
if cp_x_request_id_0:
# index of sys_span = len(self.spans)-1
self.leaf_x_request_id.setdefault(
cp_x_request_id_0, []).append(len(self.spans) - 1)
if cp_x_request_id_1 and cp_x_request_id_1 != cp_x_request_id_0:
self.leaf_x_request_id.setdefault(
cp_x_request_id_1, []).append(len(self.spans) - 1)

def remove_server_sys_span(self, sys_span: SysSpanNode):
# 这里应该要做 append_sys_span 的逆操作(但对象仅为 ServerProcess sys_span)
Expand Down Expand Up @@ -1617,8 +1633,8 @@ def split_to_multiple_process_span_set(self) -> list:

# 构建一个并查集,将 spans 按 root 划分成多个子树
disjoint_set = DisjointSet()
# 避免 parent_index out of range
disjoint_set.disjoint_set = [-1] * max_flow_index
# 这里会跳索引,不是连续顺序,避免 index out of range,预分配大小
disjoint_set.disjoint_set = [-1] * (max_flow_index + 1)
for i in range(len(self.spans)):
parent_span_index = flow_index_to_span_index.get(
self.spans[i].get_parent_id(), -1)
Expand All @@ -1635,7 +1651,7 @@ def split_to_multiple_process_span_set(self) -> list:
# 极端情况下可能会有多个没有 parent_span_id 的入口,这里没法分辨它们的关系,不做拆分
if root_parent_span_id == '':
root_parent_span_id = "root" # 只是标记 root_parent_span_id,没有实际作用
if root_parent_span_id not in split_result:
if split_result.get(root_parent_span_id, None) is None:
newSet = ProcessSpanSet(root_parent_span_id)
newSet.app_span_roots = [self.spans[root_span_index]]
newSet._copy_meta_data_from(self)
Expand Down Expand Up @@ -1668,7 +1684,12 @@ def attach_sys_span_via_app_span(self, sys_span: SysSpanNode) -> bool:
return self._attach_client_sys_span(sys_span)

def _attach_server_sys_span(self, sys_span: SysSpanNode) -> bool:
# connection priority: span_id > syscall_trace_id > x_request_id
span_id_of_sys_span = sys_span.get_span_id()
syscall_trace_id_request = sys_span.get_syscall_trace_id_request()
syscall_trace_id_response = sys_span.get_syscall_trace_id_response()
x_request_id_0 = sys_span.get_x_request_id_0()
x_request_id_1 = sys_span.get_x_request_id_1()
if span_id_of_sys_span:
for app_root in self.app_span_roots:
if span_id_of_sys_span == app_root.get_parent_span_id():
Expand Down Expand Up @@ -1707,10 +1728,9 @@ def _attach_server_sys_span(self, sys_span: SysSpanNode) -> bool:
"s-p sys_span mounted due to same span_id as parent"
)
return True
else:
syscall_trace_id_request = sys_span.get_syscall_trace_id_request()
syscall_trace_id_response = sys_span.get_syscall_trace_id_response(
)

# span_id not matched, try syscall_trace_id
if syscall_trace_id_request or syscall_trace_id_response:
for app_root in self.app_span_roots:
# 如果 span_id 不存在,说明可能是入口 span,上游没有注入 span_id,此时根据叶子节点 c-p 的 syscall_trace_id 匹配即可
# 这里匹配可以严格点,s-p 和 c-p 只会同侧(req-req / res-res)相等,避免误关联一个独立的 c-p
Expand All @@ -1722,6 +1742,36 @@ def _attach_server_sys_span(self, sys_span: SysSpanNode) -> bool:
"s-p sys_span mounted due to syscall_trace_id matched c-p"
)
return True

# span_id/syscall not matched, try x_request_id
if x_request_id_0 or x_request_id_1:
# 场景:过 ingress/nginx 进入服务网关/服务,传递了 x_request_id,且作为首个 span 没有 trace_id/span_id
# 且发生跨线程调度,无法基于 syscall 关联时,允许通过 s-p.x_request_id(0/1) <=> c-p.x_request_id(0/1) 关联
# 此处已确保 auto_instance_id 一致 (即同一个进程)

# x_req_id 同侧相等: 透传 x_req_id,来自上游
# x_req_id 异侧相等: 注入 x_req_id,内部产生
x_req_id_matched = False
# 同一个进程内时间一定覆盖
for same_xreqid_idx in self.leaf_x_request_id.get(
x_request_id_0, []):
if sys_span.time_range_cover(self.spans[same_xreqid_idx]):
x_req_id_matched = True
if not x_req_id_matched:
for same_xreqid_idx in self.leaf_x_request_id.get(
x_request_id_1, []):
if sys_span.time_range_cover(self.spans[same_xreqid_idx]):
x_req_id_matched = True
if x_req_id_matched:
for app_root in self.app_span_roots:
if app_root.get_parent_id() < 0:
self.append_sys_span(sys_span)
app_root.set_parent(
sys_span,
"s-p sys_span mounted due to x_request_id matched c-p",
self.mounted_callback)
return True

return False

def _attach_client_sys_span(self, sys_span: SysSpanNode) -> bool:
Expand Down Expand Up @@ -2168,7 +2218,7 @@ def _union_sys_spans(

# 对 client_sys_spans 按 syscall_trace_id 划分为一个个集合
cp_disjoint_set = DisjointSet()
cp_disjoint_set.disjoint_set = [-1] * len(client_sys_spans)
cp_disjoint_set.disjoint_set = [-1] * (len(client_sys_spans) + 1)
for i in range(len(client_sys_spans)):
span = client_sys_spans[i]
if span.get_syscall_trace_id_response() > 0:
Expand Down