From 7ebb81ee2733c2c7e5a3c504b713dcf4594c2c4a Mon Sep 17 00:00:00 2001 From: "sv.giampa" Date: Fri, 6 Oct 2023 22:28:14 +0000 Subject: [PATCH 1/3] Runtime arguments of SocialDataApp moved to execute method --- parsoda/apps/sentiment_analysis.py | 11 +++---- parsoda/apps/trajectory_mining.py | 11 +++---- parsoda/model/social_data_app.py | 36 ++++++---------------- test/integration/test_setiment_analysis.py | 11 +++---- test/integration/test_trajectory_mining.py | 10 ++---- test/usecase/trajectory_mining_1m.py | 5 +-- test/usecase/trajectory_mining_40m.py | 5 +-- test/usecase/trajectory_mining_6X.py | 7 ++--- 8 files changed, 28 insertions(+), 68 deletions(-) diff --git a/parsoda/apps/sentiment_analysis.py b/parsoda/apps/sentiment_analysis.py index b025d3f..7dd309a 100644 --- a/parsoda/apps/sentiment_analysis.py +++ b/parsoda/apps/sentiment_analysis.py @@ -14,18 +14,15 @@ def parsoda_sentiment_analysis( - driver: ParsodaDriver, - crawlers: List[Crawler], - *, - num_partitions=-1, - chunk_size=64, + *, + crawlers=[], emoji_file="./resources/input/emoji.json", visualization_file="./resources/output/emoji_polarization.txt", keywords: str = "", keywords_separator: str = " ", keywords_threshold: int = 1 -): - app = SocialDataApp("Sentiment Analysis", driver, num_partitions=num_partitions, chunk_size=chunk_size) +) -> SocialDataApp: + app = SocialDataApp("Sentiment Analysis") app.set_crawlers(crawlers) app.set_filters([ ContainsKeywords( diff --git a/parsoda/apps/trajectory_mining.py b/parsoda/apps/trajectory_mining.py index 64dc876..142e6c9 100644 --- a/parsoda/apps/trajectory_mining.py +++ b/parsoda/apps/trajectory_mining.py @@ -15,20 +15,17 @@ def __secondary_key(x): return x[0] def parsoda_trajectory_mining( - driver: ParsodaDriver, - crawlers: List[Crawler], rois_file: str, - *, - num_partitions=-1, - chunk_size=64, + *, + crawlers: List[Crawler] = [], min_trajectory_length=3, min_support=1, min_gap=0, max_gap=10, visualization_file="trajectory_mining.txt", visualization_min_length=3 -): - app = SocialDataApp("Trajectory Mining", driver, num_partitions=num_partitions, chunk_size=chunk_size) +) -> SocialDataApp: + app = SocialDataApp("Trajectory Mining") app.set_crawlers(crawlers) app.set_filters([ IsInRoI(rois_file) diff --git a/parsoda/model/social_data_app.py b/parsoda/model/social_data_app.py index e59b0cb..4da0ad5 100644 --- a/parsoda/model/social_data_app.py +++ b/parsoda/model/social_data_app.py @@ -4,6 +4,7 @@ from typing import Iterable, TypeVar, Generic, Any, Callable, Protocol, Optional, List, Dict from parsoda.model.driver.parsoda_driver import ParsodaDriver +from parsoda.model.driver.parsoda_multicore_driver import ParsodaMultiCoreDriver from parsoda.model.function.crawler import Crawler from parsoda.model.function.visualizer import Visualizer from parsoda.model import Filter, Mapper, Reducer, Analyzer @@ -183,7 +184,7 @@ def _filter_kv_none(kv): class SocialDataApp(Generic[K, V, R, A]): - def __init__(self, app_name: str, driver: ParsodaDriver, num_partitions=None, chunk_size=128): + def __init__(self, app_name: str): """ Creates a new social data app :param app_name: application (or sub-application) name @@ -192,9 +193,6 @@ def __init__(self, app_name: str, driver: ParsodaDriver, num_partitions=None, ch :param chunk_size: the preferred size of the data chunks read from the input files """ self.__app_name = app_name - self.__num_partitions = num_partitions - self.__chunk_size = chunk_size - self.__driver = driver self.__crawlers: List[Crawler] = [] self.__filters: List[Filter] = [] @@ -205,20 +203,6 @@ def __init__(self, app_name: str, driver: ParsodaDriver, num_partitions=None, ch self.__visualizer: Optional[Visualizer[A]] = None self.__report_file: str = "parsoda_report.csv" - def set_num_partitions(self, num_partitions: int): - """ - Sets the number of partitions. This is overriden by the chunk size if it is set. - """ - self.__num_partitions = num_partitions - return self - - def set_chunk_size(self, chunk_size: int): - """ - Sets the data chunk size in megabytes. This parameter overrides the number of partitions. - """ - self.__chunk_size = chunk_size - return self - def set_report_file(self, filename: str): self.__report_file = filename @@ -268,7 +252,7 @@ def set_visualizer(self, visualizer: Visualizer[A]): self.__visualizer = visualizer return self - def execute(self) -> ParsodaReport: + def execute(self, driver: ParsodaDriver=ParsodaMultiCoreDriver(), num_partitions=-1, chunk_size=128) -> ParsodaReport: #locale.setlocale(locale.LC_ALL, "en_US.utf8") if self.__crawlers is None or len(self.__crawlers) == 0: @@ -286,14 +270,12 @@ def execute(self) -> ParsodaReport: # VERY IMPORTANT: de-couples all objects from "self" # Avoids "self" to be serialized by some execution environment (e.g., PySpark) - driver = self.__driver reducer = self.__reducer secondary_key = self.__secondary_sort_key_function - - print(f"[ParSoDA/{self.__app_name}] initializing driver: {type(self.__driver).__name__}") - driver.set_chunk_size(self.__chunk_size*1024*1024) - driver.set_num_partitions(self.__num_partitions) + print(f"[ParSoDA/{self.__app_name}] initializing driver: {type(driver).__name__}") + driver.set_chunk_size(chunk_size*1024*1024) + driver.set_num_partitions(num_partitions) driver.init_environment() crawling_time: int @@ -376,9 +358,9 @@ def execute(self) -> ParsodaReport: report = ParsodaReport( self.__app_name, - self.__driver, - self.__num_partitions, - self.__chunk_size, + driver, + num_partitions, + chunk_size, crawling_time, filter_time, map_time, diff --git a/test/integration/test_setiment_analysis.py b/test/integration/test_setiment_analysis.py index c74b35d..dd1bc8e 100644 --- a/test/integration/test_setiment_analysis.py +++ b/test/integration/test_setiment_analysis.py @@ -12,19 +12,16 @@ from parsoda.model.driver.parsoda_pyspark_driver import ParsodaPySparkDriver from parsoda.model.driver.parsoda_singlecore_driver import ParsodaSingleCoreDriver from parsoda.model.driver.parsoda_multicore_driver import ParsodaMultiCoreDriver +from parsoda.model.social_data_app import SocialDataApp def sentiment_analysis_testcase(driver): - app = parsoda_sentiment_analysis( - driver = driver, - crawlers = [ - DistributedFileCrawler('resources/input/test.json', ParsodaParser()) - ], + app: SocialDataApp = parsoda_sentiment_analysis( + crawlers=[DistributedFileCrawler('resources/input/test.json', ParsodaParser())], emoji_file="./resources/input/emoji.json", visualization_file="./resources/output/sentiment_analysis.txt", - chunk_size=1 ) - report = app.execute() + report = app.execute(driver, chunk_size=1) return report.get_reduce_result_length() class TestSentimentAnalysis(unittest.TestCase): diff --git a/test/integration/test_trajectory_mining.py b/test/integration/test_trajectory_mining.py index 455a684..1a2f898 100644 --- a/test/integration/test_trajectory_mining.py +++ b/test/integration/test_trajectory_mining.py @@ -15,15 +15,11 @@ def trajectory_mining_testcase(driver): app = parsoda_trajectory_mining( - driver = driver, - crawlers = [ - DistributedFileCrawler('resources/input/test.json', ParsodaParser()) - ], + crawlers=[DistributedFileCrawler('resources/input/test.json', ParsodaParser())], rois_file="./resources/input/RomeRoIs.kml", - visualization_file="./resources/output/trajectory_mining.txt", - chunk_size=1, + visualization_file="./resources/output/trajectory_mining.txt" ) - report = app.execute() + report = app.execute(driver, chunk_size=1) return report.get_reduce_result_length() class TestTrajectoryMining(unittest.TestCase): diff --git a/test/usecase/trajectory_mining_1m.py b/test/usecase/trajectory_mining_1m.py index 96ccd9f..dab9942 100644 --- a/test/usecase/trajectory_mining_1m.py +++ b/test/usecase/trajectory_mining_1m.py @@ -15,14 +15,11 @@ if __name__ == '__main__': test = ParsodaUseCaseParameters() app = parsoda_trajectory_mining( - driver = test.driver, crawlers = [ DistributedFileCrawler('resources/input/test.json', ParsodaParser()) ], rois_file="./resources/input/RomeRoIs.kml", - num_partitions=test.partitions, - chunk_size=test.chunk_size, visualization_file="./resources/output/trajectory_mining.txt" ) - app.execute() + app.execute(driver = test.driver, num_partitions=test.partitions, chunk_size=test.chunk_size) diff --git a/test/usecase/trajectory_mining_40m.py b/test/usecase/trajectory_mining_40m.py index 60abd61..f27895d 100644 --- a/test/usecase/trajectory_mining_40m.py +++ b/test/usecase/trajectory_mining_40m.py @@ -15,14 +15,11 @@ if __name__ == '__main__': test = ParsodaUseCaseParameters() app = parsoda_trajectory_mining( - driver = test.driver, crawlers = [ DistributedFileCrawler('resources/input/synthetic_40m.json', ParsodaParser()) ], rois_file="./resources/input/RomeRoIs.kml", - num_partitions=test.partitions, - chunk_size=test.chunk_size, visualization_file="./resources/output/trajectory_mining.txt" ) - app.execute() + app.execute(driver=test.driver, num_partitions=test.partitions, chunk_size=test.chunk_size) diff --git a/test/usecase/trajectory_mining_6X.py b/test/usecase/trajectory_mining_6X.py index 6837248..54c98d6 100644 --- a/test/usecase/trajectory_mining_6X.py +++ b/test/usecase/trajectory_mining_6X.py @@ -15,14 +15,11 @@ if __name__ == '__main__': test = ParsodaUseCaseParameters() app = parsoda_trajectory_mining( - driver = test.driver, crawlers = [ - DistributedFileCrawler('/storage/dataset/TwitterRome2017_6X.json', TwitterParser()) + DistributedFileCrawler('/storage/dataset/TwitterRome2017_6X.json', TwitterParser()) ], rois_file="./resources/input/RomeRoIs.kml", - num_partitions=test.partitions, - chunk_size=test.chunk_size, visualization_file="./resources/output/trajectory_mining.txt" ) - app.execute() + app.execute(driver=test.driver, num_partitions=test.partitions, chunk_size=test.chunk_size) From f20ad8dd78ec495d481cf9e43baf624432dcf0c4 Mon Sep 17 00:00:00 2001 From: "sv.giampa" Date: Fri, 6 Oct 2023 22:37:41 +0000 Subject: [PATCH 2/3] Update README --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 99199ea..08c18ff 100755 --- a/README.md +++ b/README.md @@ -32,14 +32,8 @@ and run # ParSoDA on top of PyCOMPSs For using the ParSoDA library on top of PyCOMPSs, it is required to import and instantiate the ParsodaPyCompssDriver class, as in the following Trajectory Mining example: - driver = ParsodaPyCompssDriver() - - app = SocialDataApp("Trajectory Mining", driver, num_partitions=args.partitions, chunk_size=args.chunk_size) - - app.set_crawlers([ - LocalFileCrawler('/root/dataset/FlickrRome2017.json', FlickrParser()) - LocalFileCrawler('/root/dataset/TwitterRome2017.json', TwitterParser()) - ]) + # statically define application algorithm + app = SocialDataApp("Trajectory Mining") app.set_filters([ IsInRoI("./resources/input/RomeRoIs.kml") ]) @@ -56,7 +50,13 @@ For using the ParSoDA library on top of PyCOMPSs, it is required to import and i ) ) - app.execute() + # setup runtime environment and run the application + driver = ParsodaPyCompssDriver() + app.set_crawlers([ + LocalFileCrawler('/root/dataset/FlickrRome2017.json', FlickrParser()) + LocalFileCrawler('/root/dataset/TwitterRome2017.json', TwitterParser()) + ]) + app.execute(driver, num_partitions=args.partitions, chunk_size=args.chunk_size) # Running an application For running an application, it is recommended to see the instructions of the execution environment used. From 52c8a2b2fe6624f805c7b950ac8417584fbd4a16 Mon Sep 17 00:00:00 2001 From: "sv.giampa" Date: Fri, 6 Oct 2023 22:39:12 +0000 Subject: [PATCH 3/3] Update README --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 08c18ff..5ec0a60 100755 --- a/README.md +++ b/README.md @@ -93,12 +93,13 @@ For defining a new use case, the developer must follow the next steps: 1. create a new runnable script in "test/usecase" directory; 2. import the test.usecase.parsoda_usecase_parameters.ParsodaUseCaseParameters; 3. load test parameters by simply creating a new ParsodaUseCaseParameters object and accessing its properties; -4. create and run the lication with hard-coded algorithm input parameters: +4. create and run the application with hard-coded algorithm input parameters: test = ParsodaUseCaseParameters() - app = SocialDataApp("App Name", test.driver, test.partitions, test.chunk_size) + app = SocialDataApp("App Name") app.set_crawlers([DistributedFileCrawler("hard-coded/path/to/a_dataset", TwitterParser())]) ... + app.execute(test.driver, test.partitions, test.chunk_size) ## Defining a new test runtime A test runtime allows to define a specific configuration for an underlying ParSoDA runtime (e.g. a specific configuration of the COMPSs environment), just for performace testing purposes.