Skip to content

Commit

Permalink
Summarize db.statement in message (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmojaki authored Jul 20, 2024
1 parent e3b139d commit 8e2b7cd
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 7 deletions.
133 changes: 133 additions & 0 deletions logfire/_internal/db_statement_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from __future__ import annotations

import re
from typing import Any, Mapping

from opentelemetry.semconv.trace import SpanAttributes

MAX_QUERY_MESSAGE_LENGTH = 80


def message_from_db_statement(attributes: Mapping[str, Any], message: str | None, span_name: str) -> str | None:
"""Try to construct a useful span message from OTel db statement.
Returns: A new string to use as span message or None to keep the original message.
"""
db_statement = attributes.get(SpanAttributes.DB_STATEMENT)
if not isinstance(db_statement, str):
# covers `None` and anything any other unexpected type
return None

if message is not None and message != span_name:
# if the message attribute is set and is different from the span name
# return None and thereby use the message attribute
return None

db_statement = db_statement.strip()
if isinstance(message, str):
db_name = attributes.get(SpanAttributes.DB_NAME)
if db_name and isinstance(db_name, str) and message.endswith(db_name):
operation = message[: -len(db_name) - 1]
else:
operation = message
if operation not in db_statement:
# operation is not in the db_statement, message is custom so avoid custom behavior
return None
if not re.fullmatch(r'\S+', operation):
# operation is not a simple word, avoid custom behavior
return None

return summarize_query(db_statement)


# we don't use \S here since that includes "(" which is not valid in a table name
TABLE_RE = '[\\w."\'`-]+'
SELECT_RE = re.compile(rf'SELECT (.+?) FROM ({TABLE_RE})', flags=re.I | re.DOTALL)
SELECT_CTE_RE = re.compile(rf'WITH (.+?) SELECT (.+?) FROM ({TABLE_RE})', flags=re.I | re.DOTALL)
SELECT_SUBQUERY_RE = re.compile(rf'SELECT (.+?) FROM (\(.+?\)) AS ({TABLE_RE})', flags=re.I | re.DOTALL)
INSERT_RE = re.compile(rf'INSERT INTO ({TABLE_RE}) (\(.+?\)) VALUES (\(.+?\))', flags=re.I)


def summarize_query(db_statement: str) -> str | None:
"""Summarize a database statement, specifically SQL queries.
Args:
db_statement: The database statement to summarize.
Returns: A new string to use as span message or None to keep the original message.
"""
db_statement = db_statement.strip()
if len(db_statement) <= MAX_QUERY_MESSAGE_LENGTH:
return db_statement

# remove comments
db_statement = re.sub(r'^[ \t]*--.*', '', db_statement, flags=re.MULTILINE)

# collapse multiple white spaces to a single space, warning - this can break/change string literals
# but I think that's okay in this scenario
db_statement = re.sub(r'\s{2,}', ' ', db_statement).strip()
if len(db_statement) <= MAX_QUERY_MESSAGE_LENGTH:
return db_statement

if select_match := SELECT_SUBQUERY_RE.match(db_statement):
expr, subquery, table = select_match.groups()
return select(expr, table, match_end=select_match.end(), db_statement=db_statement, sub_query=subquery)
elif select_match := SELECT_RE.match(db_statement):
return select(*select_match.groups(), match_end=select_match.end(), db_statement=db_statement)
elif select_match := SELECT_CTE_RE.match(db_statement):
ctes, expr, table = select_match.groups()
return select(expr, table, match_end=select_match.end(), db_statement=db_statement, ctes=ctes)
elif insert_match := INSERT_RE.match(db_statement):
table, columns, values = insert_match.groups()
return f'INSERT INTO {table} {truncate(columns, 25)} VALUES {truncate(values, 25)}'
else:
return fallback(db_statement)


def select(
expr: str, table: str, *, match_end: int, db_statement: str, ctes: str | None = None, sub_query: str | None = None
) -> str:
expr = truncate(expr, 20)

if sub_query:
summary = f'SELECT {expr} FROM {truncate(sub_query, 25)} AS {truncate(table, 15)}'
else:
# 25 for table because this is the best identifier of the query
summary = f'SELECT {expr} FROM {truncate(table, 25)}'

if ctes:
cte_as = re.findall(rf'({TABLE_RE}) AS', ctes, flags=re.I)
if len(cte_as) == 1:
summary = f'WITH {truncate(cte_as[0], 10)} AS (…) {summary}'
else:
summary = f'WITH …[{len(cte_as)} CTEs] {summary}'

joins = re.findall(rf'JOIN ({TABLE_RE})', db_statement[match_end:], flags=re.I)
if len(joins) == 1:
summary = f'{summary} JOIN {truncate(joins[0], 10)} ON …'
elif joins:
summary = f'{summary} …[{len(joins)} JOINs]'

if re.search('WHERE', db_statement[match_end:], flags=re.I):
summary = f'{summary} WHERE …'

if limit := re.search(r'LIMIT (\d+)', db_statement[match_end:], flags=re.I):
summary = f'{summary} LIMIT {limit.group(1)}'

return truncate(summary, MAX_QUERY_MESSAGE_LENGTH)


def truncate(s: str, length: int) -> str:
if len(s) <= length:
return s
else:
half_length = length // 2
return f'{s[:half_length]}{s[-half_length:]}'


FALLBACK_HALF = MAX_QUERY_MESSAGE_LENGTH // 2 - 2


def fallback(db_statement: str):
return f'{db_statement[:FALLBACK_HALF]}{db_statement[-FALLBACK_HALF:]}'
10 changes: 10 additions & 0 deletions logfire/_internal/exporters/processor_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PENDING_SPAN_NAME_SUFFIX,
log_level_attributes,
)
from ..db_statement_summary import message_from_db_statement
from ..scrubbing import BaseScrubber
from ..utils import ReadableSpanDict, is_instrumentation_suppressed, span_to_dict, truncate_string
from .wrapper import WrapperSpanProcessor
Expand Down Expand Up @@ -53,6 +54,7 @@ def on_end(self, span: ReadableSpan) -> None:
_tweak_asgi_send_receive_spans(span_dict)
_tweak_sqlalchemy_connect_spans(span_dict)
_tweak_http_spans(span_dict)
_summarize_db_statement(span_dict)
_set_error_level_and_status(span_dict)
self.scrubber.scrub_span(span_dict)
span = ReadableSpan(**span_dict)
Expand Down Expand Up @@ -241,3 +243,11 @@ def _tweak_http_spans(span: ReadableSpanDict):

if message != name:
span['attributes'] = {**attributes, ATTRIBUTES_MESSAGE_KEY: message}


def _summarize_db_statement(span: ReadableSpanDict):
attributes = span['attributes']
message: str | None = attributes.get(ATTRIBUTES_MESSAGE_KEY) # type: ignore
summary = message_from_db_statement(attributes, message, span['name'])
if summary is not None:
span['attributes'] = {**attributes, ATTRIBUTES_MESSAGE_KEY: summary}
17 changes: 10 additions & 7 deletions tests/otel_integrations/test_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class AuthRecord(Base):
'end_time': 4000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'PRAGMA example.db',
'logfire.msg': 'PRAGMA main.table_info("auth_records")',
'db.statement': 'PRAGMA main.table_info("auth_records")',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand All @@ -88,7 +88,7 @@ class AuthRecord(Base):
'end_time': 6000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'PRAGMA example.db',
'logfire.msg': 'PRAGMA temp.table_info("auth_records")',
'db.statement': 'PRAGMA temp.table_info("auth_records")',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand All @@ -102,7 +102,10 @@ class AuthRecord(Base):
'end_time': 8000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'CREATE example.db',
'logfire.msg': """\
CREATE TABLE auth_records ( id INTEGER … t VARCHAR NOT NULL, PRIMARY KEY (id)
)\
""",
'db.statement': '\nCREATE TABLE auth_records (\n\tid INTEGER NOT NULL, \n\tnumber INTEGER NOT NULL, \n\tcontent VARCHAR NOT NULL, \n\tPRIMARY KEY (id)\n)\n\n',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand Down Expand Up @@ -130,7 +133,7 @@ class AuthRecord(Base):
'end_time': 12000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'select example.db',
'logfire.msg': 'select * from auth_records',
'db.statement': 'select * from auth_records',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand All @@ -144,7 +147,7 @@ class AuthRecord(Base):
'end_time': 14000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'INSERT example.db',
'logfire.msg': 'INSERT INTO auth_records (id, number, content) VALUES (?, ?, ?)',
'db.statement': 'INSERT INTO auth_records (id, number, content) VALUES (?, ?, ?)',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand Down Expand Up @@ -172,7 +175,7 @@ class AuthRecord(Base):
'end_time': 18000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'SELECT example.db',
'logfire.msg': 'SELECT auth_recor…ds_content FROM auth_records WHERE …',
'db.statement': 'SELECT auth_records.id AS auth_records_id, auth_records.number AS auth_records_number, auth_records.content AS auth_records_content \nFROM auth_records \nWHERE auth_records.id = ?',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand All @@ -186,7 +189,7 @@ class AuthRecord(Base):
'end_time': 20000000000,
'attributes': {
'logfire.span_type': 'span',
'logfire.msg': 'DELETE example.db',
'logfire.msg': 'DELETE FROM auth_records WHERE auth_records.id = ?',
'db.statement': 'DELETE FROM auth_records WHERE auth_records.id = ?',
'db.system': 'sqlite',
'db.name': 'example.db',
Expand Down
155 changes: 155 additions & 0 deletions tests/test_db_statement_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from logfire._internal.db_statement_summary import message_from_db_statement


def test_no_db_statement():
assert message_from_db_statement({}, None, 'x') is None


def test_short_db_statement():
assert message_from_db_statement({'db.statement': 'SELECT * FROM table'}, None, 'x') == 'SELECT * FROM table'


def test_message_same():
assert (
message_from_db_statement({'db.statement': 'SELECT * FROM table'}, 'SELECT', 'SELECT') == 'SELECT * FROM table'
)


def test_message_different():
assert message_from_db_statement({'db.statement': 'SELECT * FROM table'}, 'SELECT', 'x') is None


def test_message_not_in_db_statement():
q = 'SELECT apple, banana, carrot, durian, egg, fig FROM table WHERE apple = 1'
assert message_from_db_statement({'db.statement': q}, 'not in statement', 'not in statement') is None


def test_message_multiword():
q = 'SELECT apple, banana, carrot, durian, egg, fig FROM table WHERE apple = 1'
assert message_from_db_statement({'db.statement': q}, 'SELECT apple', 'SELECT apple') is None


def test_ok_after_clean():
q = """
-- this is a long comment about the sql
SELECT apple, banana, carrot, durian, egg, fig FROM table
"""
# insert_assert(message_from_db_statement({'db.statement': q}, None, 'x'))
assert (
message_from_db_statement({'db.statement': q}, None, 'x')
== 'SELECT apple, banana, carrot, durian, egg, fig FROM table'
)


def attrs(q: str):
return {'db.statement': q, 'db.system': 'postgresql'}


def test_query_rewritten():
q = 'SELECT apple, banana, carrot, durian, egg, fig FROM table WHERE apple = 1 and banana = 2'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT apple, ban…, egg, fig FROM table WHERE …'


def test_invalid_sql():
q = 'SELECT apple, banana, carrot, durian, egg, fig FROM "table WHERE apple = 1 offset 12345678901234567890'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT apple, ban…, egg, fig FROM "table WHERE …'


def test_one_cte():
q = 'WITH foobar AS (SELECT apple, banana, carrot, durian FROM table) SELECT * FROM foobar where x = 1'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'WITH foobar AS (…) SELECT * FROM foobar WHERE …'


def test_one_cte_long():
q = 'WITH foobar_foobar_foobar AS (SELECT apple, banana, carrot FROM table) SELECT * FROM foobar where x = 1'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'WITH fooba…oobar AS (…) SELECT * FROM foobar WHERE …'


def test_two_ctes():
q = 'WITH foo AS (SELECT * FROM table), bar AS (SELECT apple, banana, carrot, durian FROM foo) SELECT * FROM bar'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'WITH …[2 CTEs] SELECT * FROM bar'


def test_long_select():
q = '\nSELECT apple, banana, carrot, durian, egg, fig, grape FROM table offset 12345678901234567890'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT apple, ban…fig, grape FROM table'


def test_from_subquery():
q = 'select * from (select * from table) as sub where aaaaa_bbbb_cccccc=1 offset 12345678901234567890'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT * FROM (select * from table) AS sub WHERE …'


def test_from_quoted():
q = 'select * from "foo.bar" as sub where aaaaa_bbbb_cccccc=1 offset 12345678901234567890'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT * FROM "foo.bar" WHERE …'


def test_from_long():
q = 'select * from "aaaaa.bbbb.cccccc" as sub where aaaaa_bbbb_cccccc=1 offset 12345678901234567890'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT * FROM "aaaaa.bbbb.cccccc" WHERE …'


def test_one_join():
q = ' SELECT apple, banana, carrot FROM table JOIN other ON table.id = other.id offset 12345678901234567890'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert (
message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT apple, ban…na, carrot FROM table JOIN other ON …'
)


def test_one_join_long():
q = ' SELECT apple, banana, carrot FROM table JOIN other_other_other ON table.id = other_other_other.id'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert (
message_from_db_statement(attrs(q), None, 'SELECT')
== 'SELECT apple, ban…na, carrot FROM table JOIN other…other ON …'
)


def test_two_joins():
q = 'SELECT * FROM table JOIN other ON table.id = other.id JOIN another ON table.id = another.id'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT * FROM table …[2 JOINs]'


def test_where():
q = 'SELECT apple, banana, carrot, durian, egg FROM table where a = 1 and b = 2 and c =3'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert message_from_db_statement(attrs(q), None, 'SELECT') == 'SELECT apple, ban…urian, egg FROM table WHERE …'


def test_limit():
q = 'SELECT apple, banana, carrot, durian, egg, fig, grape FROM table where apple=12345678901234567890 limit 10'
# insert_assert(message_from_db_statement(attrs(q), None, 'SELECT'))
assert (
message_from_db_statement(attrs(q), None, 'SELECT')
== 'SELECT apple, ban…fig, grape FROM table WHERE … LIMIT 10'
)


def test_update():
q = 'UPDATE table set apple = 1 where banana = 2 and carrrrrrrot = 3 and durian = 4 and egg = 5 and fig = 6'
# insert_assert(message_from_db_statement(attrs(q), None, 'UPDATE'))
assert (
message_from_db_statement(attrs(q), None, 'UPDATE')
== 'UPDATE table set apple = 1 where banan … and durian = 4 and egg = 5 and fig = 6'
)


def test_insert():
q = 'INSERT INTO table (apple, banana, carrot, durian, egg, fig) VALUES (1, 2, 3, 4, 5, 6)'
# insert_assert(message_from_db_statement(attrs(q), None, 'INSERT'))
assert (
message_from_db_statement(attrs(q), None, 'INSERT')
== 'INSERT INTO table (apple, bana…n, egg, fig) VALUES (1, 2, 3, 4, 5, 6)'
)

0 comments on commit 8e2b7cd

Please # to comment.