Skip to content

Commit

Permalink
mySQL schema data model updates (#19472)
Browse files Browse the repository at this point in the history
* logging testing

* new dictionary shape brainstorm

* partitions portion of dictionary re-written

* Indexes shape change and more testing

* lint and changelog

* add subpart test, remove logging

* Delete mysql/changelog.d/19472.changed

* changelog

* delete log

* fix unit tests, review comments

* linter!!!

* more unit tests fixes

* mariadb version for collation
  • Loading branch information
azhou-datadog authored Jan 29, 2025
1 parent fd20b03 commit 3e22c06
Showing 7 changed files with 368 additions and 246 deletions.
1 change: 1 addition & 0 deletions mysql/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
@@ -385,6 +385,7 @@ files:
hidden: true
description: |
Configure collection of schemas (databases).
Only tables and schemas for which the user has been granted SELECT privileges are collected.
options:
- name: enabled
description: |
1 change: 1 addition & 0 deletions mysql/changelog.d/19472.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update mysql schema data model
152 changes: 116 additions & 36 deletions mysql/datadog_checks/mysql/databases_data.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
from ..stubs import datadog_agent
import json
import time
from collections import defaultdict
from contextlib import closing

import pymysql
@@ -20,6 +21,8 @@
SQL_DATABASES,
SQL_FOREIGN_KEYS,
SQL_INDEXES,
SQL_INDEXES_8_0_13,
SQL_INDEXES_EXPRESSION_COLUMN_CHECK,
SQL_PARTITION,
SQL_TABLES,
)
@@ -203,15 +206,17 @@ def _collect_databases_data(self, tags):
- indexes (list): A list of index dictionaries.
- index (dict): A dictionary representing an index.
- name (str): The name of the index.
- collation (str): The collation of the index.
- cardinality (str): The cardinality of the index.
- index_type (str): The type of the index.
- seq_in_index (str): The sequence in index.
- columns (str): The columns in the index.
- sub_parts (str): The sub-parts of the index.
- packed (str): Whether the index is packed.
- nullables (str): The nullable columns in the index.
- non_uniques (str): Whether the index is non-unique.
- cardinality (int): The cardinality of the index.
- index_type (str): The index method used.
- columns (list): A list of column dictionaries
- column (dict): A dictionary representing a column.
- name (str): The name of the column.
- sub_part (int): The number of indexed characters if column is partially indexed.
- collation (str): The collation of the column.
- packed (str): How the index is packed. NONE if it is not.
- nullable (bool): Whether the column is nullable.
- non_unique (bool): Whether the index can contain duplicates.
- expression (str): If index was built with a functional key part, the expression used.
- foreign_keys (list): A list of foreign key dictionaries.
- foreign_key (dict): A dictionary representing a foreign key.
- constraint_schema (str): The schema of the constraint.
@@ -223,21 +228,22 @@ def _collect_databases_data(self, tags):
- partitions (list): A list of partition dictionaries.
- partition (dict): A dictionary representing a partition.
- name (str): The name of the partition.
- subpartition_names (str): The names of the subpartitions.
- subpartitions (list): A list of subpartition dictionaries.
- subpartition (dict): A dictionary representing a subpartition.
- name (str): The name of the subpartition.
- subpartition_ordinal_position (int): The ordinal position of the subpartition.
- subpartition_method (str): The subpartition method.
- subpartition_expression (str): The subpartition expression.
- table_rows (int): The number of rows in the subpartition.
- data_length (int): The data length of the subpartition in bytes.
- partition_ordinal_position (str): The ordinal position of the partition.
- subpartition_ordinal_positions (str): The ordinal positions of the subpartitions.
- partition_method (str): The partition method.
- subpartition_methods (str): The subpartition methods.
- partition_expression (str): The partition expression.
- subpartition_expressions (str): The subpartition expressions.
- partition_description (str): The description of the partition.
- table_rows (str): The number of rows in the partition.
- data_lengths (str): The data lengths in the partition.
- max_data_lengths (str): The maximum data lengths in the partition.
- index_lengths (str): The index lengths in the partition.
- data_free (str): The free data space in the partition.
- partition_comment (str): The comment on the partition.
- tablespace_name (str): The tablespace name.
- table_rows (int): The number of rows in the partition. If partition has subpartitions,
this is the sum of all subpartitions table_rows.
- data_length (int): The data length of the partition in bytes. If partition has
subpartitions, this is the sum of all subpartitions data_length.
"""
self._data_submitter.reset()
self._tags = tags
@@ -262,14 +268,14 @@ def _fetch_for_databases(self, db_infos, cursor):
self._fetch_database_data(cursor, start_time, db_info['name'])
except StopIteration as e:
self._log.error(
"While executing fetch database data for databse {}, the following exception occured {}".format(
"While executing fetch database data for database {}, the following exception occured {}".format(
db_info['name'], e
)
)
return
except Exception as e:
self._log.error(
"While executing fetch database data for databse {}, the following exception occured {}".format(
"While executing fetch database data for database {}, the following exception occured {}".format(
db_info['name'], e
)
)
@@ -334,21 +340,54 @@ def _populate_with_columns_data(self, table_name_to_table_index, table_list, tab

@tracked_method(agent_check_getter=agent_check_getter)
def _populate_with_index_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor):
self._cursor_run(cursor, query=SQL_INDEXES.format(table_names), params=db_name)
self._cursor_run(cursor, query=SQL_INDEXES_EXPRESSION_COLUMN_CHECK)
query = (
SQL_INDEXES_8_0_13.format(table_names)
if cursor.fetchone()["column_count"] > 0
else SQL_INDEXES.format(table_names)
)
self._cursor_run(cursor, query=query, params=db_name)
rows = cursor.fetchall()
if not rows:
return
table_index_dict = defaultdict(
lambda: defaultdict(
lambda: {
"name": None,
"cardinality": 0,
"index_type": None,
"columns": [],
"non_unique": None,
"expression": None,
}
)
)
for row in rows:
table_name = str(row.pop("table_name"))
table_name = str(row["table_name"])
table_list[table_name_to_table_index[table_name]].setdefault("indexes", [])
if "nullables" in row:
nullables_arr = row["nullables"].split(',')
nullables_converted = ""
for s in nullables_arr:
if s.lower() == "yes":
nullables_converted += "true,"
else:
nullables_converted += "false,"
row["nullables"] = nullables_converted[:-1]
table_list[table_name_to_table_index[table_name]]["indexes"].append(row)
index_name = str(row["name"])
index_data = table_index_dict[table_name][index_name]

# Update index-level info
index_data["name"] = index_name
index_data["cardinality"] = int(row["cardinality"])
index_data["index_type"] = str(row["index_type"])
index_data["non_unique"] = bool(row["non_unique"])
index_data["expression"] = str(row["expression"]) if row["expression"] else None

# Add column info, if exists
if row["column_name"]:
column = {
"name": row["column_name"],
"sub_part": int(row["sub_part"]) if row["sub_part"] else None,
"collation": str(row["collation"]) if row["collation"] else None,
"packed": str(row["packed"]) if row["packed"] else None,
"nullable": bool(row["nullable"].lower() == "yes"),
}
index_data["columns"].append(column)

for table_name, index_dict in table_index_dict.items():
table_list[table_name_to_table_index[table_name]]["indexes"] = list(index_dict.values())

@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
def _populate_with_foreign_keys_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor):
@@ -363,7 +402,48 @@ def _populate_with_foreign_keys_data(self, table_name_to_table_index, table_list
def _populate_with_partitions_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor):
self._cursor_run(cursor, query=SQL_PARTITION.format(table_names), params=db_name)
rows = cursor.fetchall()
if not rows:
return
table_partitions_dict = defaultdict(
lambda: defaultdict(
lambda: {
"name": None,
"subpartitions": [],
"partition_ordinal_position": None,
"partition_method": None,
"partition_expression": None,
"partition_description": None,
"table_rows": 0,
"data_length": 0,
}
)
)

for row in rows:
table_name = str(row.pop("table_name"))
table_name = str(row["table_name"])
table_list[table_name_to_table_index[table_name]].setdefault("partitions", [])
table_list[table_name_to_table_index[table_name]]["partitions"].append(row)
partition_name = str(row["name"])
partition_data = table_partitions_dict[table_name][partition_name]

# Update partition-level info
partition_data["name"] = partition_name
partition_data["partition_ordinal_position"] = int(row["partition_ordinal_position"])
partition_data["partition_method"] = str(row["partition_method"])
partition_data["partition_expression"] = str(row["partition_expression"]).strip().lower()
partition_data["partition_description"] = str(row["partition_description"])
partition_data["table_rows"] += int(row["table_rows"])
partition_data["data_length"] += int(row["data_length"])

# Add subpartition info, if exists
if row["subpartition_name"]:
subpartition = {
"name": row["subpartition_name"],
"subpartition_ordinal_position": int(row["subpartition_ordinal_position"]),
"subpartition_method": str(row["subpartition_method"]),
"subpartition_expression": str(row["subpartition_expression"]).strip().lower(),
"table_rows": int(row["table_rows"]),
"data_length": int(row["data_length"]),
}
partition_data["subpartitions"].append(subpartition)
for table_name, partitions_dict in table_partitions_dict.items():
table_list[table_name_to_table_index[table_name]]["partitions"] = list(partitions_dict.values())
1 change: 0 additions & 1 deletion mysql/datadog_checks/mysql/metadata.py
Original file line number Diff line number Diff line change
@@ -83,7 +83,6 @@ def __init__(self, check, config, connection_args):
self._version_processed = False
self._connection_args = connection_args
self._db = None
self._check = check
self._databases_data = DatabasesData(self, check, config)
self._last_settings_collection_time = 0
self._last_databases_collection_time = 0
51 changes: 36 additions & 15 deletions mysql/datadog_checks/mysql/queries.py
Original file line number Diff line number Diff line change
@@ -115,22 +115,48 @@
WHERE table_schema = %s AND table_name IN ({});
"""

SQL_INDEXES_EXPRESSION_COLUMN_CHECK = """
SELECT COUNT(*) as column_count
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'information_schema'
AND TABLE_NAME = 'STATISTICS'
AND COLUMN_NAME = 'EXPRESSION';
"""

SQL_INDEXES = """\
SELECT
table_name as `table_name`,
index_schema as `index_schema`,
index_name as `name`,
collation as `collation`,
cardinality as `cardinality`,
index_type as `index_type`,
seq_in_index as `seq_in_index`,
column_name as `column_name`,
sub_part as `sub_part`,
packed as `packed`,
nullable as `nullable`,
non_unique as `non_unique`,
NULL as `expression`
FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema = %s AND table_name IN ({});
"""

SQL_INDEXES_8_0_13 = """\
SELECT
table_name as `table_name`,
index_name as `name`,
collation as `collation`,
cardinality as `cardinality`,
index_type as `index_type`,
group_concat(seq_in_index order by seq_in_index asc) as seq_in_index,
group_concat(column_name order by seq_in_index asc) as columns,
group_concat(sub_part order by seq_in_index asc) as sub_parts,
group_concat(packed order by seq_in_index asc) as packed,
group_concat(nullable order by seq_in_index asc) as nullables,
group_concat(non_unique order by seq_in_index asc) as non_uniques
seq_in_index as `seq_in_index`,
column_name as `column_name`,
sub_part as `sub_part`,
packed as `packed`,
nullable as `nullable`,
non_unique as `non_unique`,
expression as `expression`
FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema = %s AND table_name IN ({})
GROUP BY table_name, index_schema, index_name, collation, index_type;
WHERE table_schema = %s AND table_name IN ({});
"""

SQL_FOREIGN_KEYS = """\
@@ -163,12 +189,7 @@
subpartition_expression as `subpartition_expression`,
partition_description as `partition_description`,
table_rows as `table_rows`,
data_length as `data_length`,
max_data_length as `max_data_length`,
index_length as `index_length`,
data_free as `data_free`,
partition_comment as `partition_comment`,
tablespace_name as `tablespace_name`
data_length as `data_length`
FROM INFORMATION_SCHEMA.PARTITIONS
WHERE
table_schema = %s AND table_name in ({}) AND partition_name IS NOT NULL
10 changes: 7 additions & 3 deletions mysql/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -8,12 +8,13 @@
import mock
import pymysql
import pytest
from packaging.version import parse as parse_version

from datadog_checks.dev import TempDir, WaitFor, docker_run
from datadog_checks.dev.conditions import CheckDockerLogs

from . import common, tags
from .common import MYSQL_REPLICATION
from .common import MYSQL_REPLICATION, MYSQL_VERSION_PARSED

logger = logging.getLogger(__name__)

@@ -497,8 +498,11 @@ def add_schema_test_databases(cursor):

# create one column index
cursor.execute("CREATE INDEX single_column_index ON cities (population);")
# create two column index
cursor.execute("CREATE INDEX two_columns_index ON cities (id, name);")
# create two column index, one with subpart and descending
cursor.execute("CREATE INDEX two_columns_index ON cities (id, name(3) DESC);")
# create functional key part index - available after MySQL 8.0.13
if MYSQL_VERSION_PARSED >= parse_version('8.0.13') and MYSQL_FLAVOR == 'mysql':
cursor.execute("CREATE INDEX functional_key_part_index ON cities ((population + 1) DESC);")

cursor.execute(
"""CREATE TABLE landmarks (
Loading

0 comments on commit 3e22c06

Please # to comment.