Skip to content

Commit 04b52f7

Browse files
MySQL blocking queries (#20008)
* blocking joins * blocking column * blocking column * idle subquery * idle blockers subquery as an option * changelog * linter * fixed variables * fixed variables * blocking_thread_id * idle session bug fix * config * cache the query * specify mysql version in the change log * validate config * linter * linter * test case * moved config to query_activity * validate config * linter * fixed idle blocker test * removed flakey lock assert * trigger ci * linter
1 parent 8ee8a6e commit 04b52f7

File tree

7 files changed

+104
-25
lines changed

7 files changed

+104
-25
lines changed

mysql/assets/configuration/spec.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,13 @@ files:
542542
value:
543543
type: number
544544
example: 10
545+
- name: collect_blocking_queries
546+
description: |
547+
Enable collection of blocking queries. Supported only on MySQL 8.0.
548+
value:
549+
type: boolean
550+
example: false
551+
display_default: false
545552
- name: index_metrics
546553
description: |
547554
Configure collection of index metrics.

mysql/changelog.d/20008.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add blocking queries support for MySQL 8

mysql/datadog_checks/mysql/activity.py

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,29 +58,53 @@
5858
waits_a.index_name,
5959
waits_a.object_type,
6060
waits_a.source
61+
{blocking_columns}
6162
FROM
6263
performance_schema.threads AS thread_a
6364
LEFT JOIN performance_schema.events_waits_current AS waits_a ON waits_a.thread_id = thread_a.thread_id
6465
LEFT JOIN performance_schema.events_statements_current AS statement ON statement.thread_id = thread_a.thread_id
66+
{blocking_joins}
6567
WHERE
66-
thread_a.processlist_state IS NOT NULL
67-
AND thread_a.processlist_command != 'Sleep'
68-
AND thread_a.processlist_id != CONNECTION_ID()
69-
AND thread_a.PROCESSLIST_COMMAND != 'Daemon'
70-
AND (waits_a.EVENT_NAME != 'idle' OR waits_a.EVENT_NAME IS NULL)
71-
AND (waits_a.operation != 'idle' OR waits_a.operation IS NULL)
72-
-- events_waits_current can have multiple rows per thread, thus we use EVENT_ID to identify the row we want to use.
73-
-- Additionally, we want the row with the highest EVENT_ID which reflects the most recent and current wait.
74-
AND (
75-
waits_a.event_id = (
76-
SELECT
77-
MAX(waits_b.EVENT_ID)
78-
FROM performance_schema.events_waits_current AS waits_b
79-
Where waits_b.thread_id = thread_a.thread_id
80-
) OR waits_a.event_id is NULL)
81-
-- We ignore rows without SQL text because there will be rows for background operations that do not have
82-
-- SQL text associated with it.
83-
AND COALESCE(statement.sql_text, thread_a.PROCESSLIST_info) != '';
68+
(
69+
thread_a.processlist_state IS NOT NULL
70+
AND thread_a.processlist_id != CONNECTION_ID()
71+
AND thread_a.PROCESSLIST_COMMAND != 'Daemon'
72+
AND thread_a.processlist_command != 'Sleep'
73+
AND (waits_a.EVENT_NAME != 'idle' OR waits_a.EVENT_NAME IS NULL)
74+
AND (waits_a.operation != 'idle' OR waits_a.operation IS NULL)
75+
-- events_waits_current can have multiple rows per thread, thus we use EVENT_ID to identify the row
76+
-- we want to use. Additionally, we want the row with the highest EVENT_ID which reflects the most recent wait.
77+
AND (
78+
waits_a.event_id = (
79+
SELECT
80+
MAX(waits_b.EVENT_ID)
81+
FROM performance_schema.events_waits_current AS waits_b
82+
Where waits_b.thread_id = thread_a.thread_id
83+
) OR waits_a.event_id is NULL)
84+
-- We ignore rows without SQL text because there will be rows for background operations that do not have
85+
-- SQL text associated with it.
86+
AND COALESCE(statement.sql_text, thread_a.PROCESSLIST_info) != ''
87+
)
88+
{idle_blockers_subquery};
89+
"""
90+
91+
BLOCKING_COLUMNS = """\
92+
,blocking_thread.thread_id AS blocking_thread_id,
93+
blocking_thread.processlist_id AS blocking_processlist_id
94+
"""
95+
96+
BLOCKING_JOINS = """\
97+
LEFT JOIN performance_schema.data_lock_waits AS lock_waits ON thread_a.thread_id = lock_waits.requesting_thread_id
98+
LEFT JOIN performance_schema.threads AS blocking_thread ON lock_waits.blocking_thread_id = blocking_thread.thread_id
99+
"""
100+
101+
IDLE_BLOCKERS_SUBQUERY = """\
102+
OR
103+
-- Include idle sessions that are blocking others
104+
thread_a.thread_id IN (
105+
SELECT blocking_thread_id
106+
FROM performance_schema.data_lock_waits
107+
)
84108
"""
85109

86110

@@ -127,6 +151,7 @@ def __init__(self, check, config, connection_args):
127151
self._db = None
128152
self._db_version = None
129153
self._obfuscator_options = to_native_string(json.dumps(self._config.obfuscator_options))
154+
self._activity_query = None
130155

131156
def run_job(self):
132157
# type: () -> None
@@ -180,11 +205,36 @@ def _collect_activity(self):
180205
tags=tags + self._check._get_debug_tags(),
181206
)
182207

208+
def _should_collect_blocking_queries(self):
209+
# type: () -> bool
210+
blocking_queries_configured = self._config.activity_config.get("collect_blocking_queries", False)
211+
return (
212+
blocking_queries_configured and self._db_version == MySQLVersion.VERSION_80 and not self._check.is_mariadb
213+
)
214+
215+
def _get_activity_query(self):
216+
# type: () -> str
217+
if self._activity_query:
218+
return self._activity_query
219+
blocking_columns = ""
220+
blocking_joins = ""
221+
idle_blockers_subquery = ""
222+
if self._should_collect_blocking_queries():
223+
blocking_columns = BLOCKING_COLUMNS
224+
blocking_joins = BLOCKING_JOINS
225+
idle_blockers_subquery = IDLE_BLOCKERS_SUBQUERY
226+
return ACTIVITY_QUERY.format(
227+
blocking_columns=blocking_columns,
228+
blocking_joins=blocking_joins,
229+
idle_blockers_subquery=idle_blockers_subquery,
230+
)
231+
183232
@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
184233
def _get_activity(self, cursor):
185234
# type: (pymysql.cursor) -> List[Dict[str]]
186-
self._log.debug("Running activity query [%s]", ACTIVITY_QUERY)
187-
cursor.execute(ACTIVITY_QUERY)
235+
query = self._get_activity_query()
236+
self._log.debug("Running activity query [%s]", query)
237+
cursor.execute(query)
188238
return cursor.fetchall()
189239

190240
def _normalize_rows(self, rows):

mysql/datadog_checks/mysql/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def __init__(self, instance, init_config):
4848
self.activity_config = instance.get('query_activity', {}) or {}
4949
self.schemas_config: dict = instance.get('schemas_collection', {}) or {}
5050
self.index_config: dict = instance.get('index_metrics', {}) or {}
51+
self.collect_blocking_queries = is_affirmative(instance.get('collect_blocking_queries', False))
5152

5253
self.cloud_metadata = {}
5354
aws = instance.get('aws', {})

mysql/datadog_checks/mysql/config_models/instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class QueryActivity(BaseModel):
130130
arbitrary_types_allowed=True,
131131
frozen=True,
132132
)
133+
collect_blocking_queries: Optional[bool] = None
133134
collection_interval: Optional[float] = None
134135
enabled: Optional[bool] = None
135136

mysql/datadog_checks/mysql/data/conf.yaml.example

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,11 @@ instances:
520520
#
521521
# collection_interval: 10
522522

523+
## @param collect_blocking_queries - boolean - optional - default: false
524+
## Enable collection of blocking queries. Supported only on MySQL 8.0.
525+
#
526+
# collect_blocking_queries: false
527+
523528
## Configure collection of index metrics.
524529
## Metrics provided by the options:
525530
## - mysql.index.size (per index)

mysql/tests/test_query_activity.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import time
99
from concurrent.futures.thread import ThreadPoolExecutor
1010
from contextlib import closing
11-
from copy import copy
11+
from copy import copy, deepcopy
1212
from datetime import datetime
1313
from threading import Event
1414

@@ -52,7 +52,7 @@ def dbm_instance(instance_complex):
5252
@pytest.mark.integration
5353
@pytest.mark.usefixtures('dd_environment')
5454
@pytest.mark.parametrize(
55-
"query,query_signature,expected_query_truncated",
55+
"query,query_signature,expected_query_truncated,collect_blocking_queries",
5656
[
5757
(
5858
'SELECT id, name FROM testdb.users FOR UPDATE',
@@ -62,6 +62,7 @@ def dbm_instance(instance_complex):
6262
else 'aca1be410fbadb61'
6363
),
6464
StatementTruncationState.not_truncated.value,
65+
True,
6566
),
6667
(
6768
'SELECT id, {} FROM testdb.users FOR UPDATE'.format(
@@ -73,11 +74,16 @@ def dbm_instance(instance_complex):
7374
else ('da7d6b1e9deb88e' if MYSQL_VERSION_PARSED > parse_version('5.7') else '63bd1fd025c7f7fb')
7475
),
7576
StatementTruncationState.truncated.value,
77+
False,
7678
),
7779
],
7880
)
79-
def test_activity_collection(aggregator, dbm_instance, dd_run_check, query, query_signature, expected_query_truncated):
80-
check = MySql(CHECK_NAME, {}, [dbm_instance])
81+
def test_activity_collection(
82+
aggregator, dbm_instance, dd_run_check, query, query_signature, expected_query_truncated, collect_blocking_queries
83+
):
84+
config = deepcopy(dbm_instance)
85+
config['query_activity']['collect_blocking_queries'] = collect_blocking_queries
86+
check = MySql(CHECK_NAME, {}, instances=[config])
8187

8288
blocking_query = 'SELECT id FROM testdb.users FOR UPDATE'
8389

@@ -148,9 +154,17 @@ def _run_blocking(conn):
148154
assert blocked_row['wait_timer_end'], "missing wait timer end"
149155
assert blocked_row['event_timer_start'], "missing event timer start"
150156
assert blocked_row['event_timer_end'], "missing event timer end"
151-
assert blocked_row['lock_time'], "missing lock time"
152157
assert blocked_row['query_truncated'] == expected_query_truncated
153158

159+
if check._query_activity._should_collect_blocking_queries():
160+
assert len(activity['mysql_activity']) >= 2, "should have collected at least two activity payloads"
161+
captured_idle_blocker = False
162+
for activity in dbm_activity:
163+
for row in activity['mysql_activity']:
164+
if row['processlist_user'] == 'bob':
165+
captured_idle_blocker = True
166+
assert captured_idle_blocker, "should have captured the idle blocker"
167+
154168

155169
@pytest.mark.integration
156170
@pytest.mark.usefixtures('dd_environment')

0 commit comments

Comments
 (0)