Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Psycopg3 sync and async instrumentation #2146

Merged
merged 27 commits into from
Mar 19, 2024
Merged
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cf404fa
* psycopg3 instrumentation including asynchronous instrumentation
reiktar Jan 31, 2024
dc78394
* Updating documents
reiktar Jan 31, 2024
b2d490d
* adding Pullrequest link to changelog
reiktar Jan 31, 2024
0b12d9b
* adding github workflow for psycopg3 instrumentation testing
reiktar Jan 31, 2024
f8c349c
* updates to pass PR checks
reiktar Feb 2, 2024
f4abf01
Merge branch 'main' into psycopg3-instrumentation
reiktar Feb 5, 2024
a897b6c
Merge branch 'main' into psycopg3-instrumentation
reiktar Feb 20, 2024
057504a
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 1, 2024
953a113
* refactor on package name psycopg insteead of psycopg3
reiktar Mar 1, 2024
dafc587
* updating Changlog to reflect the new PR scope
reiktar Mar 1, 2024
8731ff9
* Cleaning up as per comments in #2146
reiktar Mar 4, 2024
9098939
* WRAPT_DISABLE_EXTENSIONS is nolonger required
reiktar Mar 4, 2024
9d2f79b
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 4, 2024
02d2295
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 11, 2024
b812e01
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 13, 2024
d38532a
* move changelog entry to unreleased section
reiktar Mar 14, 2024
f1f1780
* move changelog entry to unreleased section
reiktar Mar 14, 2024
f577330
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 14, 2024
8c95c88
* remove lingering newline
reiktar Mar 14, 2024
09890a4
* lingering refrerence to Psycopg3Instrumentor (hold over from the or…
reiktar Mar 14, 2024
dca2adb
Merge branch 'main' into psycopg3-instrumentation
lzchen Mar 14, 2024
6025de9
* linting + black
reiktar Mar 15, 2024
08e0c50
* Contribute.MD should point out that manually running black must use…
reiktar Mar 15, 2024
55b6f55
* isort and pylint disagrees on where to have the pylint options. Thi…
reiktar Mar 18, 2024
3543b7b
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 18, 2024
ee377d6
Merge branch 'main' into psycopg3-instrumentation
ocelotl Mar 18, 2024
9c450f4
Merge branch 'main' into psycopg3-instrumentation
lzchen Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -25,11 +25,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `opentelemtetry-instrumentation-psycopg3` Async Instrumentation for psycopg 3.x
([#2146](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2146))
- `opentelemetry-instrumentation-psycopg` Initial release for psycopg 3.x

## Version 1.22.0/0.43b0 (2023-12-14)

### Added

- `opentelemetry-instrumentation-asyncio` Add support for asyncio
([#1919](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1943))
- `opentelemetry-instrumentation` Added Otel semantic convention opt-in mechanism
Original file line number Diff line number Diff line change
@@ -106,6 +106,7 @@
from typing import Collection

import psycopg
from psycopg import AsyncCursor as pg_async_cursor
from psycopg import Cursor as pg_cursor # pylint: disable=no-name-in-module
from psycopg.sql import Composed # pylint: disable=no-name-in-module

@@ -151,9 +152,36 @@ def _instrument(self, **kwargs):
commenter_options=commenter_options,
)

dbapi.wrap_connect(
__name__,
psycopg.Connection,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
)
dbapi.wrap_connect(
__name__,
psycopg.AsyncConnection,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiAsyncIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
)

def _uninstrument(self, **kwargs):
""" "Disable Psycopg instrumentation"""
dbapi.unwrap_connect(psycopg, "connect")
dbapi.unwrap_connect(psycopg.Connection, "connect")
dbapi.unwrap_connect(psycopg.AsyncConnection, "connect")

# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
@staticmethod
@@ -204,6 +232,26 @@ def wrapped_connection(
return connection


class DatabaseApiAsyncIntegration(dbapi.DatabaseApiIntegration):
async def wrapped_connection(
self,
connect_method: typing.Callable[..., typing.Any],
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
"""Add object proxy to connection object."""
base_cursor_factory = kwargs.pop("cursor_factory", None)
new_factory_kwargs = {"db_api": self}
if base_cursor_factory:
new_factory_kwargs["base_factory"] = base_cursor_factory
kwargs["cursor_factory"] = _new_cursor_async_factory(
**new_factory_kwargs
)
connection = await connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return connection


class CursorTracer(dbapi.CursorTracer):
def get_operation_name(self, cursor, args):
if not args:
@@ -259,3 +307,36 @@ def callproc(self, *args, **kwargs):
)

return TracedCursorFactory


def _new_cursor_async_factory(
db_api=None, base_factory=None, tracer_provider=None
):
if not db_api:
db_api = DatabaseApiAsyncIntegration(
__name__,
Psycopg3Instrumentor._DATABASE_SYSTEM,
connection_attributes=Psycopg3Instrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
base_factory = base_factory or pg_async_cursor
_cursor_tracer = CursorTracer(db_api)

class TracedCursorAsyncFactory(base_factory):
async def execute(self, *args, **kwargs):
return await _cursor_tracer.traced_execution(
self, super().execute, *args, **kwargs
)

async def executemany(self, *args, **kwargs):
return await _cursor_tracer.traced_execution(
self, super().executemany, *args, **kwargs
)

async def callproc(self, *args, **kwargs):
return await _cursor_tracer.traced_execution(
self, super().callproc, *args, **kwargs
)

return TracedCursorAsyncFactory
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import types
from unittest import mock

@@ -23,6 +24,11 @@
from opentelemetry.test.test_base import TestBase


def async_call(coro, *args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro, *args, **kwargs)


class MockCursor:
execute = mock.MagicMock(spec=types.MethodType)
execute.__name__ = "execute"
@@ -45,6 +51,35 @@ def __exit__(self, *args):
return self


class MockAsyncCursor:
def __init__(self, *args, **kwargs):
pass

# pylint: disable=unused-argument, no-self-use
async def execute(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")

# pylint: disable=unused-argument, no-self-use
async def executemany(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")

# pylint: disable=unused-argument, no-self-use
async def callproc(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")

async def __aenter__(self, *args, **kwargs):
return self

async def __aexit__(self, *args, **kwargs):
pass

def close(self):
pass


class MockConnection:
commit = mock.MagicMock(spec=types.MethodType)
commit.__name__ = "commit"
@@ -64,22 +99,71 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use
return {"dbname": "test"}


class MockAsyncConnection:
commit = mock.MagicMock(spec=types.MethodType)
commit.__name__ = "commit"

rollback = mock.MagicMock(spec=types.MethodType)
rollback.__name__ = "rollback"

def __init__(self, *args, **kwargs):
self.cursor_factory = kwargs.pop("cursor_factory", None)
pass

@classmethod
async def connect(*args, **kwargs):
return MockAsyncConnection(**kwargs)

def cursor(self):
if self.cursor_factory:
cur = self.cursor_factory(self)
print("Returning factory cursor", cur)
return cur
print("Returning MockAsyncCursor")
return MockAsyncCursor()

def get_dsn_parameters(self): # pylint: disable=no-self-use
return {"dbname": "test"}

async def __aenter__(self):
return self

async def __aexit__(self, *args):
return mock.MagicMock(spec=types.MethodType)


class TestPostgresqlIntegration(TestBase):
def setUp(self):
super().setUp()
self.cursor_mock = mock.patch(
"opentelemetry.instrumentation.psycopg.pg_cursor", MockCursor
)
self.cursor_async_mock = mock.patch(
"opentelemetry.instrumentation.psycopg.pg_async_cursor",
MockAsyncCursor,
)
self.connection_mock = mock.patch("psycopg.connect", MockConnection)
self.connection_sync_mock = mock.patch(
"psycopg.Connection.connect", MockConnection
)
self.connection_async_mock = mock.patch(
"psycopg.AsyncConnection.connect", MockAsyncConnection.connect
)

self.cursor_mock.start()
self.cursor_async_mock.start()
self.connection_mock.start()
self.connection_sync_mock.start()
self.connection_async_mock.start()

def tearDown(self):
super().tearDown()
self.memory_exporter.clear()
self.cursor_mock.stop()
self.cursor_async_mock.stop()
self.connection_mock.stop()
self.connection_sync_mock.stop()
self.connection_async_mock.stop()
with self.disable_logging():
PsycopgInstrumentor().uninstrument()

@@ -114,6 +198,93 @@ def test_instrumentor(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

# pylint: disable=unused-argument
def test_instrumentor_with_connection_class(self):
PsycopgInstrumentor().instrument()

cnx = psycopg.Connection.connect(database="test")

cursor = cnx.cursor()

query = "SELECT * FROM test"
cursor.execute(query)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.psycopg
)

# check that no spans are generated after uninstrument
PsycopgInstrumentor().uninstrument()

cnx = psycopg.Connection.connect(database="test")
cursor = cnx.cursor()
query = "SELECT * FROM test"
cursor.execute(query)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

def test_wrap_async_connection_class_with_cursor(self):
PsycopgInstrumentor().instrument()

async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
async with cnx.cursor() as cursor:
await cursor.execute("SELECT * FROM test")

async_call(test_async_connection())
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.psycopg
)

# check that no spans are generated after uninstrument
PsycopgInstrumentor().uninstrument()

async_call(test_async_connection())

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

# pylint: disable=unused-argument
async def test_instrumentor_with_async_connection_class(self):
PsycopgInstrumentor().instrument()

async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
await cnx.execute("SELECT * FROM test")

import asyncio

asyncio.run(test_async_connection)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.psycopg
)

# check that no spans are generated after uninstrument
PsycopgInstrumentor().uninstrument()
asyncio.run(test_async_connection())

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

def test_span_name(self):
PsycopgInstrumentor().instrument()

@@ -140,6 +311,34 @@ def test_span_name(self):
self.assertEqual(spans_list[4].name, "query")
self.assertEqual(spans_list[5].name, "query")

async def test_span_name_async(self):
PsycopgInstrumentor().instrument()

acnx = psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
async with cnx.cursor() as cursor:
await cursor.execute("Test query", ("param1Value", False))
await cursor.execute(
"""multi
line
query"""
)
await cursor.execute("tab\tseparated query")
await cursor.execute("/* leading comment */ query")
await cursor.execute(
"/* leading comment */ query /* trailing comment */"
)
await cursor.execute("query /* trailing comment */")

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 6)
self.assertEqual(spans_list[0].name, "Test")
self.assertEqual(spans_list[1].name, "multi")
self.assertEqual(spans_list[2].name, "tab")
self.assertEqual(spans_list[3].name, "query")
self.assertEqual(spans_list[4].name, "query")
self.assertEqual(spans_list[5].name, "query")

# pylint: disable=unused-argument
def test_not_recording(self):
mock_tracer = mock.Mock()
@@ -160,6 +359,27 @@ def test_not_recording(self):

PsycopgInstrumentor().uninstrument()

# pylint: disable=unused-argument
async def test_not_recording_async(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
PsycopgInstrumentor().instrument()
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
acnx = psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
async with cnx.cursor() as cursor:
query = "SELECT * FROM test"
cursor.execute(query)
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

PsycopgInstrumentor().uninstrument()

# pylint: disable=unused-argument
def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -307,6 +307,7 @@ setenv =
; i.e: CORE_REPO_SHA=dde62cebffe519c35875af6d06fae053b3be65ec tox -e <env to test>
CORE_REPO_SHA={env:CORE_REPO_SHA:main}
CORE_REPO=git+https://github.com/open-telemetry/opentelemetry-python.git@{env:CORE_REPO_SHA}
WRAPT_DISABLE_EXTENSIONS="1"

changedir =
test-distro: opentelemetry-distro/tests