Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add events property to boto3.session.Session #204

Merged
merged 1 commit into from
Aug 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions boto3/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ def profile_name(self):
"""
return self._session.profile or 'default'

@property
def events(self):
"""
The event emitter for a session
"""
return self._session.get_component('event_emitter')

def _setup_loader(self):
"""
Setup loader paths so that we can load resources.
Expand Down
34 changes: 34 additions & 0 deletions tests/functional/test_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from tests import unittest

import boto3.session


class TestSession(unittest.TestCase):
def setUp(self):
self.session = boto3.session.Session(region_name='us-west-2')

def test_events_attribute(self):
# Create some function to register.
def my_handler(my_list, **kwargs):
return my_list.append('my_handler called')

# Register the handler to the event.
self.session.events.register('myevent', my_handler)

initial_list = []
# Emit the event.
self.session.events.emit('myevent', my_list=initial_list)
# Ensure that the registered handler was called.
self.assertEqual(initial_list, ['my_handler called'])
6 changes: 6 additions & 0 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,9 @@ def test_bad_resource_name(self):
session = Session(botocore_session=mock_bc_session)
with self.assertRaises(DataNotFoundError):
session.resource('sqs')

def test_can_reach_events(self):
mock_bc_session = self.bc_session_cls()
session = Session(botocore_session=mock_bc_session)
session.events
mock_bc_session.get_component.assert_called_with('event_emitter')