diff --git a/docs/user/bots.rst b/docs/user/bots.rst index 4e0a269c0..f1244e958 100644 --- a/docs/user/bots.rst +++ b/docs/user/bots.rst @@ -3123,6 +3123,32 @@ Use this command to create/update the database and reload the bot: .. _intelmq.bots.experts.url2fqdn.expert: +Truncate By Delimiter +^^^^^^^^^ + +**Information** + +* `name:` `intelmq.bots.experts.truncate_by_delimiter.expert` +* `lookup:` no +* `public:` yes +* `cache (redis db):` none +* `description:` Cut string if length is bigger than max + +**Configuration Parameters** + +* `delimiter`: example . or ; +* `max_length`: max string length +* `field`: string field + +Example: Cut through a long domain with a dot. Truncated until the domain not exceeds the maximum length. + +input domain = www.subdomain.web.secondsubomain.test.domain.com +delimiter = '.' +max_length = 20 +results = test.domain.com + +.. _intelmq.bots.experts.truncate_by_delimiter.expert: + Url2FQDN ^^^^^^^^ diff --git a/intelmq/bots/experts/truncate_by_delimiter/__init__.py b/intelmq/bots/experts/truncate_by_delimiter/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelmq/bots/experts/truncate_by_delimiter/expert.py b/intelmq/bots/experts/truncate_by_delimiter/expert.py new file mode 100644 index 000000000..1cad74f45 --- /dev/null +++ b/intelmq/bots/experts/truncate_by_delimiter/expert.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +Cut string if length is bigger than max + +SPDX-FileCopyrightText: 2021 Marius Karotkis <marius.karotkis@gmail.com> +SPDX-License-Identifier: AGPL-3.0-or-later +""" +from intelmq.lib.bot import Bot + + +class TruncateByDelimiterExpertBot(Bot): + delimiter: str = '.' + max_length: int = 200 + field: str = 'source.fqdn' + + def process(self): + event = self.receive_message() + + if self.field in event: + long_string = event[self.field] + while self.delimiter in long_string and len(long_string) > self.max_length: + long_string = long_string.split(self.delimiter, 1)[1] + event.change(self.field, long_string) + + self.send_message(event) + self.acknowledge_message() + + +BOT = TruncateByDelimiterExpertBot diff --git a/intelmq/tests/bots/experts/truncate_by_delimiter/__init__.py b/intelmq/tests/bots/experts/truncate_by_delimiter/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelmq/tests/bots/experts/truncate_by_delimiter/test_expert.py b/intelmq/tests/bots/experts/truncate_by_delimiter/test_expert.py new file mode 100644 index 000000000..f25ee83b7 --- /dev/null +++ b/intelmq/tests/bots/experts/truncate_by_delimiter/test_expert.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +""" +Testing truncate by delimiter bot + +SPDX-FileCopyrightText: 2021 Marius Karotkis <marius.karotkis@gmail.com> +SPDX-License-Identifier: AGPL-3.0-or-later +""" +import unittest +import intelmq.lib.test as test +from intelmq.bots.experts.truncate_by_delimiter.expert import TruncateByDelimiterExpertBot + +EXAMPLE_INPUT = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'source.fqdn': 'bing.com.google.com.digikala.com.myket.com.divar.ir.varzesh3.pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_OUTPUT = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'source.fqdn': 'pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_INPUT_2 = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_OUTPUT_2 = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + + +class TestTruncateByDelimiterExpertBot(test.BotTestCase, unittest.TestCase): + """ + A TestCase for TestTruncateByDelimiterExpertBot. + """ + + @classmethod + def set_bot(cls): + cls.bot_reference = TruncateByDelimiterExpertBot + + def test_event_cut(self): + self.input_message = EXAMPLE_INPUT + self.run_bot() + self.assertMessageEqual(0, EXAMPLE_OUTPUT) + + def test_event_cut_without_field(self): + self.input_message = EXAMPLE_INPUT_2 + self.run_bot() + self.assertMessageEqual(0, EXAMPLE_OUTPUT_2) + + +if __name__ == '__main__': # pragma: no cover + unittest.main()