From 8ce13d1f7d4423ddaf0e7910781199be9b90ce40 Mon Sep 17 00:00:00 2001 From: Rajat Gupta <35985127+rjt-gupta@users.noreply.github.com> Date: Sat, 8 Jun 2019 11:43:06 +0530 Subject: [PATCH] SQLite & SQLiteDB Helper Tests (#327) * sqlite test * CI fix * get_abs_path * Update change Co-Authored-By: Ravinder Nehra --- tanner/tests/test_sqlite.py | 84 ++++++++++++---- tanner/tests/test_sqlite_db_helper.py | 135 ++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 20 deletions(-) create mode 100644 tanner/tests/test_sqlite_db_helper.py diff --git a/tanner/tests/test_sqlite.py b/tanner/tests/test_sqlite.py index dc2686ad..d09f7677 100644 --- a/tanner/tests/test_sqlite.py +++ b/tanner/tests/test_sqlite.py @@ -4,6 +4,7 @@ import unittest from unittest import mock +from tanner.utils.asyncmock import AsyncMock from tanner.emulators import sqlite @@ -15,40 +16,83 @@ def setUp(self): os.makedirs(os.path.dirname(self.filename), exist_ok=True) open('/tmp/db/test_db', 'a').close() # Insert some testing data - conn = sqlite3.connect(self.filename) - self.cursor = conn.cursor() + self.conn = sqlite3.connect(self.filename) + self.cursor = self.conn.cursor() self.cursor.execute('CREATE TABLE test (id INTEGER PRIMARY KEY, username TEXT)') self.cursor.execute('INSERT INTO TEST VALUES(0, "test0")') - conn.commit() + self.conn.commit() self.handler = sqlite.SQLITEEmulator('test_db', '/tmp/') + self.returned_result = None + self.expected_result = None - def tearDown(self): - if os.path.exists(self.filename): + def test_setup_db(self): + + self.handler.helper.create_query_map = mock.Mock( + return_value={'test': [{'name': 'id', 'type': 'INTEGER'}, {'name': 'username', 'type': 'TEXT'}]}) + self.handler.helper.setup_db_from_config = mock.Mock() + + async def test(): + self.returned_result = await self.handler.setup_db() + + self.loop.run_until_complete(test()) + self.handler.helper.create_query_map.assert_called_with('/tmp/db/', 'test_db') + assert not self.handler.helper.setup_db_from_config.called + + def test_setup_db_not_exists(self): + + self.handler.helper.create_query_map = mock.Mock( + return_value={'test': [{'name': 'id', 'type': 'INTEGER'}, {'name': 'username', 'type': 'TEXT'}]}) + self.handler.helper.setup_db_from_config = AsyncMock() + + async def test(): os.remove(self.filename) + self.returned_result = await self.handler.setup_db() + + self.loop.run_until_complete(test()) + self.handler.helper.setup_db_from_config.assert_called_with('/tmp/db/', 'test_db') + self.handler.helper.create_query_map.assert_called_with('/tmp/db/', 'test_db') - def test_db_copy(self): + def test_create_attacker_db(self): session = mock.Mock() session.sess_uuid.hex = 'd877339ec415484987b279469167af3d' self.loop.run_until_complete(self.handler.create_attacker_db(session)) self.assertTrue(os.path.exists('/tmp/db/attacker_d877339ec415484987b279469167af3d')) - def test_create_query_map(self): - result = self.handler.helper.create_query_map('/tmp/db', 'test_db') - assert_result = {'test': [{'name': 'id', 'type': 'INTEGER'}, {'name': 'username', 'type': 'TEXT'}]} - self.assertEqual(result, assert_result) + def test_execute_query(self): - def test_insert_dummy_data(self): - def mock_generate_dummy_data(data_tokens): - return [(1, 'test1'), (2, 'test2')], ['I', 'L'] + self.expected_result = [[[1, 'test_name']], [[1, 'test@domain.com', 'test_pass']]] - self.handler.helper.generate_dummy_data = mock_generate_dummy_data + result = [] + self.query = [ + ['/tmp/db/TEST_DB', "CREATE TABLE IF NOT EXISTS TEST (ID INTEGER PRIMARY KEY, USERNAME TEXT)", + 'INSERT INTO TEST VALUES(1, "test_name")'], + ['/tmp/db/CREDS_DB', "CREATE TABLE IF NOT EXISTS CREDS (ID INTEGER PRIMARY KEY, EMAIL VARCHAR(15), " + "PASSWORD VARCHAR(15))", "INSERT INTO CREDS VALUES(1, 'test@domain.com', 'test_pass')"] + ] + test_query = [['/tmp/db/TEST_DB', 'SELECT * FROM TEST'], ['/tmp/db/CREDS_DB', 'SELECT * FROM CREDS']] - self.loop.run_until_complete(self.handler.helper.insert_dummy_data('test', 'I,L', self.cursor)) - assert_result = [[0, 'test0'], [1, 'test1'], [2, 'test2']] + def setup(data): + os.makedirs(os.path.dirname(data[0]), exist_ok=True) + self.conn = sqlite3.connect(data[0]) + self.cursor = self.conn.cursor() + self.cursor.execute(data[1]) + self.cursor.execute(data[2]) + self.conn.commit() - result = [] - for row in self.cursor.execute('SELECT * FROM test;'): - result.append(list(row)) + for data in self.query: + setup(data) + + async def test(data): + self.returned_result = await self.handler.execute_query(data[1], data[0]) + result.append(self.returned_result) + self.handler.helper.delete_db(data[0]) + + for query in test_query: + self.loop.run_until_complete(test(query)) - self.assertEqual(result, assert_result) + self.assertEqual(self.expected_result, result) + + def tearDown(self): + if os.path.exists(self.filename): + os.remove(self.filename) diff --git a/tanner/tests/test_sqlite_db_helper.py b/tanner/tests/test_sqlite_db_helper.py new file mode 100644 index 00000000..30c43ce5 --- /dev/null +++ b/tanner/tests/test_sqlite_db_helper.py @@ -0,0 +1,135 @@ +import asyncio +import os +import sqlite3 +import unittest +import subprocess +from unittest import mock + +from tanner.utils.asyncmock import AsyncMock +from tanner.utils.sqlite_db_helper import SQLITEDBHelper + + +class TestSQLiteDBHelper(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.filename = '/tmp/db/test_db' + os.makedirs(os.path.dirname(self.filename), exist_ok=True) + open('/tmp/db/test_db', 'a').close() + # Insert some testing data + conn = sqlite3.connect(self.filename) + self.cursor = conn.cursor() + self.cursor.execute('CREATE TABLE TEST (id INTEGER PRIMARY KEY, username TEXT)') + self.cursor.execute('INSERT INTO TEST VALUES(0, "test0")') + conn.commit() + + self.handler = SQLITEDBHelper() + self.returned_result = None + self.expected_result = None + + def test_setup_db_from_config(self): + config = { + "name": "test_db", + "tables": [ + { + "schema": "CREATE TABLE IF NOT EXISTS CREDS (ID INTEGER PRIMARY KEY, EMAIL VARCHAR(15), " + "PASSWORD VARCHAR(15))", + "table_name": "CREDS", + "data_tokens": "I,E,P" + } + ] + } + + def mock_read_config(): + return config + + self.result = [] + self.handler.read_config = mock_read_config + self.handler.insert_dummy_data = AsyncMock() + + calls = [ + mock.call('CREDS', 'I,E,P', mock.ANY) + ] + + self.expected_result = [[('CREATE TABLE CREDS (ID INTEGER PRIMARY KEY, EMAIL VARCHAR(15), PASSWORD ' + 'VARCHAR(15))',), ('CREATE TABLE TEST (id INTEGER PRIMARY KEY, username TEXT)',)]] + + async def test(): + await self.handler.setup_db_from_config('/tmp/', self.filename) + self.cursor.execute('SELECT sql FROM sqlite_master ORDER BY tbl_name') + result = self.cursor.fetchall() + self.result.append(result) + self.handler.delete_db(self.filename) + + self.loop.run_until_complete(test()) + + self.assertEqual(self.result, self.expected_result) + self.handler.insert_dummy_data.assert_has_calls(calls, any_order=True) + + def test_get_abs_path(self): + self.path = 'db/attacker_db' + self.returned_result = self.handler.get_abs_path(self.path, '/tmp/') + self.expected_result = '/tmp/db/attacker_db' + self.assertEqual(self.returned_result, self.expected_result) + + def test_get_abs_path_2(self): + self.path = '../../tmp/db/./test_db' + self.returned_result = self.handler.get_abs_path(self.path, '/tmp/') + self.expected_result = '/tmp/db/test_db' + self.assertEqual(self.returned_result, self.expected_result) + + def test_copy_db(self): + self.attacker_db = '/tmp/db/attacker_db' + + self.returned_result = self.handler.copy_db(self.filename, self.attacker_db, '/tmp/') + self.assertTrue(os.path.exists(self.attacker_db)) + + diff_db = "diff /tmp/db/test_db /tmp/db/attacker_db" + self.result = b'' + + # Checking if new DB is same as original DB + try: + diff_db = subprocess.Popen(diff_db, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) + self.outs, errs = diff_db.communicate(timeout=15) + diff_db.wait() + + except subprocess.CalledProcessError: + pass + + self.assertEqual(self.outs, self.result) + + # Deleting the DB + os.remove('/tmp/db/attacker_db') + + def test_create_query_map(self): + self.returned_result = self.handler.create_query_map('/tmp/db', 'test_db') + self.expected_result = {'TEST': [{'name': 'id', 'type': 'INTEGER'}, {'name': 'username', 'type': 'TEXT'}]} + self.assertEqual(self.returned_result, self.expected_result) + + @mock.patch("tanner.utils.sqlite_db_helper.sqlite3") + def test_create_query_map_error(self, sqlite): + sqlite.OperationalError = sqlite3.OperationalError + sqlite.connect().cursor().execute.side_effect = sqlite3.OperationalError + + with self.assertLogs(level='ERROR') as log: + self.returned_result = self.handler.create_query_map('/tmp/db', 'test_db') + self.assertIn('Error during query map creation', log.output[0]) + + def test_insert_dummy_data(self): + def mock_generate_dummy_data(data_tokens): + return [(1, 'test1'), (2, 'test2')], ['I', 'L'] + + self.handler.generate_dummy_data = mock_generate_dummy_data + + self.loop.run_until_complete(self.handler.insert_dummy_data('test', 'I,L', self.cursor)) + self.expected_result = [[0, 'test0'], [1, 'test1'], [2, 'test2']] + + result = [] + for row in self.cursor.execute('SELECT * FROM test;'): + result.append(list(row)) + + self.assertEqual(result, self.expected_result) + + def tearDown(self): + if os.path.exists(self.filename): + os.remove(self.filename)