Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions sql/data_dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,181 @@
else:
res = {"status": 1, "msg": "非法调用!"}
return HttpResponse(
json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
content_type="application/json",
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def view_list(request):
"""数据字典获取视图列表(仅MySQL)"""
return _dict_list(request, db_type_required="mysql", engine_method="get_views_list")


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def view_info(request):
"""数据字典获取视图详情(仅MySQL)"""
return _dict_detail(
request,
db_type_required="mysql",
engine_method="get_view_detail",
name_param="view_name",
engine_kwarg="view_name",
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def trigger_list(request):
"""数据字典获取触发器列表(仅MySQL)"""
return _dict_list(
request, db_type_required="mysql", engine_method="get_triggers_list"
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def trigger_info(request):
"""数据字典获取触发器详情(仅MySQL)"""
return _dict_detail(
request,
db_type_required="mysql",
engine_method="get_trigger_detail",
name_param="trigger_name",
engine_kwarg="trigger_name",
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def procedure_list(request):
"""数据字典获取存储过程列表(仅MySQL)"""
return _dict_list(
request, db_type_required="mysql", engine_method="get_procedures_list"
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def procedure_info(request):
"""数据字典获取存储过程详情(仅MySQL)"""
return _dict_detail(
request,
db_type_required="mysql",
engine_method="get_procedure_detail",
name_param="proc_name",
engine_kwarg="proc_name",
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def function_list(request):
"""数据字典获取函数列表(仅MySQL)"""
return _dict_list(
request, db_type_required="mysql", engine_method="get_functions_list"
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def function_info(request):
"""数据字典获取函数详情(仅MySQL)"""
return _dict_detail(
request,
db_type_required="mysql",
engine_method="get_function_detail",
name_param="func_name",
engine_kwarg="func_name",
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def event_list(request):
"""数据字典获取定时任务列表(仅MySQL)"""
return _dict_list(
request, db_type_required="mysql", engine_method="get_events_list"
)


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def event_info(request):
"""数据字典获取定时任务详情(仅MySQL)"""
return _dict_detail(
request,
db_type_required="mysql",
engine_method="get_event_detail",
name_param="event_name",
engine_kwarg="event_name",
)


def _dict_list(request, db_type_required, engine_method):
"""通用数据字典对象列表接口"""
instance_name = request.GET.get("instance_name", "")
db_name = request.GET.get("db_name", "")
db_type = request.GET.get("db_type", "")

if db_type_required and db_type != db_type_required:
res = {"status": 1, "msg": "仅MySQL支持该功能"}
return HttpResponse(
json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type="application/json",
)

if instance_name and db_name:
try:
instance = Instance.objects.get(
instance_name=instance_name, db_type=db_type
)
Comment on lines +205 to +207
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restrict dictionary list lookups to authorized instances

For users who have sql.menu_data_dictionary but are not assigned to a target instance's resource group, these new list endpoints still resolve Instance globally from request-controlled instance_name/db_type. A direct call to /data_dictionary/view_list/, /procedure_list/, etc. can enumerate object names and comments from instances the UI would not list for that user; use user_instances(request.user, db_type=[...]).get(...) before creating the engine.

Useful? React with 👍 / 👎.

query_engine = get_engine(instance=instance)
db_name = query_engine.escape_string(db_name)
data = getattr(query_engine, engine_method)(db_name=db_name)
Comment on lines +209 to +210
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Apply hidden-database filters before querying objects

The database dropdown is populated via instance_resource, which applies show_db_name_regex/denied_db_name_regex before exposing databases, but these new object endpoints accept any request db_name and immediately query it. On an otherwise authorized instance with denied databases, a direct call to /data_dictionary/procedure_list/ (or the other new list/detail endpoints) can enumerate routines/events/views in databases intentionally hidden from the user; reject db_name values that would be filtered before calling the engine.

Useful? React with 👍 / 👎.

Comment on lines +209 to +210
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep database names raw for parameterized object queries

When a MySQL database name contains characters that escape_string changes (for example a quoted identifier containing '), this pre-escaped value is then passed both as the connection database and as a bound parameter to the information_schema queries. The lookup targets the escaped spelling instead of the actual schema, so all of the new object list/detail APIs return empty data or connection errors for valid schema names; pass the raw db_name to parameterized calls and only escape when interpolating into SQL text.

Useful? React with 👍 / 👎.

res = {"status": 0, "data": data}
except Instance.DoesNotExist:
res = {"status": 1, "msg": "Instance.DoesNotExist"}
except Exception as e:
res = {"status": 1, "msg": str(e)}
else:
res = {"status": 1, "msg": "非法调用!"}
return HttpResponse(
json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type="application/json",
)


def _dict_detail(request, db_type_required, engine_method, name_param, engine_kwarg):
"""通用数据字典对象详情接口"""
instance_name = request.GET.get("instance_name", "")
db_name = request.GET.get("db_name", "")
obj_name = request.GET.get(name_param, "")
db_type = request.GET.get("db_type", "")

if db_type_required and db_type != db_type_required:
res = {"status": 1, "msg": "仅MySQL支持该功能"}
return HttpResponse(
json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type="application/json",
)

if instance_name and db_name and obj_name:
try:
instance = Instance.objects.get(
instance_name=instance_name, db_type=db_type
)
Comment on lines +240 to +242
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restrict dictionary detail lookups to authorized instances

For users who have sql.menu_data_dictionary but are not assigned to a target instance's resource group, these new detail endpoints still fetch Instance globally from request-controlled instance_name/db_type. A direct call to /data_dictionary/procedure_info/, /function_info/, /event_info/, etc. can therefore return routine/event DDL from an instance the UI would not list for that user; use user_instances(request.user, db_type=[...]).get(...) (as the export path does) before creating the engine.

Useful? React with 👍 / 👎.

query_engine = get_engine(instance=instance)
db_name = query_engine.escape_string(db_name)
obj_name = query_engine.escape_string(obj_name)
Comment on lines +244 to +245
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Pass raw object names to parameterized detail queries

When an object name contains characters that escape_string changes (for example a procedure named with a single quote), this pre-escaped value is then used as a bound parameter in the information_schema equality checks inside the engine methods and also as the identifier for SHOW CREATE. The lookup no longer matches the actual object name, so the detail modal/API returns empty metadata or a failed SHOW CREATE for valid MySQL identifiers; keep the raw request value for parameterized queries and only escape when constructing an identifier.

Useful? React with 👍 / 👎.

data = getattr(query_engine, engine_method)(
**{"db_name": db_name, engine_kwarg: obj_name}
)
res = {"status": 0, "data": data}
except Instance.DoesNotExist:
res = {"status": 1, "msg": "Instance.DoesNotExist"}
except Exception as e:
res = {"status": 1, "msg": str(e)}
else:
res = {"status": 1, "msg": "非法调用!"}
return HttpResponse(
json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
content_type="application/json",
)


def get_export_full_path(base_dir: str, instance_name: str, db_name: str) -> str:
"""validate if the instance_name and db_name provided is secure"""
fullpath = os.path.normpath(
Expand Down
40 changes: 40 additions & 0 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,46 @@ def get_tables_metas_data(self, db_name, **kwargs):
"""获取数据库所有表格信息,用作数据字典导出接口"""
return list()

def get_views_list(self, db_name, **kwargs):
"""获取视图列表, 返回 dict"""
return dict()

def get_view_detail(self, db_name, view_name, **kwargs):
"""获取视图详情, 返回 dict"""
return dict()

def get_triggers_list(self, db_name, **kwargs):
"""获取触发器列表, 返回 dict"""
return dict()

def get_trigger_detail(self, db_name, trigger_name, **kwargs):
"""获取触发器详情, 返回 dict"""
return dict()

def get_procedures_list(self, db_name, **kwargs):
"""获取存储过程列表, 返回 dict"""
return dict()

def get_procedure_detail(self, db_name, proc_name, **kwargs):
"""获取存储过程详情, 返回 dict"""
return dict()

def get_functions_list(self, db_name, **kwargs):
"""获取函数列表, 返回 dict"""
return dict()

def get_function_detail(self, db_name, func_name, **kwargs):
"""获取函数详情, 返回 dict"""
return dict()

def get_events_list(self, db_name, **kwargs):
"""获取定时任务列表, 返回 dict"""
return dict()

def get_event_detail(self, db_name, event_name, **kwargs):
"""获取定时任务详情, 返回 dict"""
return dict()

def get_all_databases_summary(self):
"""实例数据库管理功能,获取实例所有的数据库描述信息"""
return ResultSet()
Expand Down
Loading
Loading