Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Feature #325 delete code artefacts #335

Merged
merged 15 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Here is a template for new release sections:
- Technology parameter is renamed to data for better comprehension [#337](https://github.com/OpenEnergyPlatform/open-MaStR/pull/337)

### Removed
- [#](https://github.com/rl-institut/super-repo/pull/)
- Removed code artefacts [#335](https://github.com/OpenEnergyPlatform/open-MaStR/pull/335)

## [v0.11.7] Patch - Hotfix - 2022-08-25
### Changed
Expand Down
1 change: 0 additions & 1 deletion open_mastr/mastr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
get_project_home_dir,
)
import open_mastr.utils.orm as orm
from open_mastr.utils.data_io import cleaned_data

# import initialize_database dependencies
from open_mastr.utils.helpers import (
Expand Down
239 changes: 82 additions & 157 deletions open_mastr/soap_api/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,88 +514,89 @@ def retrieve_additional_location_data(
log.info("No further data is requested")
break

def create_additional_data_requests(
self,
data,
data_types=["unit_data", "eeg_data", "kwk_data", "permit_data"],
delete_existing=True,
):
"""
Create new requests for additional unit data

For units that exist in basic_units but not in the table for additional
data of `data_type`, a new data request
is submitted.

Parameters
----------
data: str
Specify data type, additional data should be requested for.
data_types: list
Select type of additional data that is to be requested.
Defaults to all data that is available for a
data type.
delete_existing: bool
Toggle deletion of already existing requests for additional data.
Defaults to True.
"""

data_requests = []

with session_scope(engine=self._engine) as session:
# Check which additional data is missing
for data_type in data_types:
if data_type_available := self.orm_map[data].get(data_type, None):
log.info(
f"Create requests for additional data of type {data_type} for {data}"
)

# Get ORM for additional data by data and data_type
additional_data_orm = getattr(orm, data_type_available)

# Delete prior additional data requests for this data and data_type
if delete_existing:
session.query(orm.AdditionalDataRequested).filter(
orm.AdditionalDataRequested.technology == data,
orm.AdditionalDataRequested.data_type == data_type,
).delete()
session.commit()
# def create_additional_data_requests(
# self,
# technology,
# data_types=["unit_data", "eeg_data", "kwk_data", "permit_data"],
# delete_existing=True,
# ):
# """
# Create new requests for additional unit data
#
# For units that exist in basic_units but not in the table for additional
# data of `data_type`, a new data request
# is submitted.
#
# Parameters
# ----------
# technology: str
# Specify technology additional data should be requested for.
# data_types: list
# Select type of additional data that is to be requested.
# Defaults to all data that is available for a
# technology.
# delete_existing: bool
# Toggle deletion of already existing requests for additional data.
# Defaults to True.
# """
#
# data_requests = []
#
# with session_scope(engine=self._engine) as session:
# # Check which additional data is missing
# for data_type in data_types:
# if data_type_available := self.orm_map[technology].get(data_type, None):
# log.info(
# f"Create requests for additional data of type {data_type} for {technology}"
# )
#
# # Get ORM for additional data by technology and data_type
# additional_data_orm = getattr(orm, data_type_available)
#
# # Delete prior additional data requests for this technology and data_type
# if delete_existing:
# session.query(orm.AdditionalDataRequested).filter(
# orm.AdditionalDataRequested.technology == technology,
# orm.AdditionalDataRequested.data_type == data_type,
# ).delete()
# session.commit()
#
# # Query database for missing additional data
# units_for_request = self._get_units_for_request(
# data_type, session, additional_data_orm, technology
# )
#
# # Prepare data for additional data request
# for basic_unit in units_for_request:
# data_request = {
# "EinheitMastrNummer": basic_unit.EinheitMastrNummer,
# "technology": self.unit_type_map[basic_unit.Einheittyp],
# "data_type": data_type,
# "request_date": datetime.datetime.now(
# tz=datetime.timezone.utc
# ),
# }
# if data_type == "unit_data":
# data_request[
# "additional_data_id"
# ] = basic_unit.EinheitMastrNummer
# elif data_type == "eeg_data":
# data_request[
# "additional_data_id"
# ] = basic_unit.EegMastrNummer
# elif data_type == "kwk_data":
# data_request[
# "additional_data_id"
# ] = basic_unit.KwkMastrNummer
# elif data_type == "permit_data":
# data_request[
# "additional_data_id"
# ] = basic_unit.GenMastrNummer
# data_requests.append(data_request)
#
# # Insert new requests for additional data into database
# session.bulk_insert_mappings(orm.AdditionalDataRequested, data_requests)

# Query database for missing additional data
units_for_request = self._get_units_for_request(
data_type, session, additional_data_orm, data
)

# Prepare data for additional data request
for basic_unit in units_for_request:
data_request = {
"EinheitMastrNummer": basic_unit.EinheitMastrNummer,
"technology": self.unit_type_map[basic_unit.Einheittyp],
"data_type": data_type,
"request_date": datetime.datetime.now(
tz=datetime.timezone.utc
),
}
if data_type == "unit_data":
data_request[
"additional_data_id"
] = basic_unit.EinheitMastrNummer
elif data_type == "eeg_data":
data_request[
"additional_data_id"
] = basic_unit.EegMastrNummer
elif data_type == "kwk_data":
data_request[
"additional_data_id"
] = basic_unit.KwkMastrNummer
elif data_type == "permit_data":
data_request[
"additional_data_id"
] = basic_unit.GenMastrNummer
data_requests.append(data_request)

# Insert new requests for additional data into database
session.bulk_insert_mappings(orm.AdditionalDataRequested, data_requests)

def _add_data_source_and_download_date(self, entry: dict) -> dict:
"""Adds DatenQuelle = 'APT' and DatumDownload = date.today"""
Expand Down Expand Up @@ -1312,82 +1313,6 @@ def reverse_fill_basic_units(self, technology):

session.execute(insert_query)

def locations_to_csv(self, location_type, limit=None):
"""
Save location raw data to CSV file

During data export to CSV file, data is reshaped to tabular format.
Data stored in JSON types is flattened and
concated to separate rows.

Parameters
----------
location_type: `str`
Select type of location that is to be retrieved. Choose from
"location_elec_generation", "location_elec_consumption", "location_gas_generation",
"location_gas_consumption".
limit: int
Limit number of rows. Defaults to None which implies all rows are selected.
"""
location_type_shorthand = {
"location_elec_generation": "SEL",
"location_elec_consumption": "SVL",
"location_gas_generation": "GEL",
"location_gas_consumption": "GVL",
}

with session_scope(engine=self._engine) as session:
# Load basic and extended location data into DataFrame
locations_extended = (
session.query(
*[
c
for c in orm.LocationBasic.__table__.columns
if c.name not in ["NameDerTechnischenLokation"]
],
*[
c
for c in orm.LocationExtended.__table__.columns
if c.name not in ["MaStRNummer"]
],
)
.join(
orm.LocationExtended,
orm.LocationBasic.LokationMastrNummer
== orm.LocationExtended.MastrNummer,
)
.filter(
orm.LocationBasic.LokationMastrNummer.startswith(
location_type_shorthand[location_type]
)
)
.limit(limit)
)

df = pd.read_sql(locations_extended.statement, session.bind)

# Expand data about grid connection points from dict into separate columns
df_expanded = pd.concat(
[pd.DataFrame(x) for x in df["Netzanschlusspunkte"]], keys=df.index
).reset_index(level=1, drop=True)
df = (
df.drop("Netzanschlusspunkte", axis=1)
.join(df_expanded)
.reset_index(drop=True)
)

# Expand data about related units into separate columns (with lists of related units)
df = df.drop("VerknuepfteEinheiten", axis=1).join(
df["VerknuepfteEinheiten"].apply(list_of_dicts_to_columns)
)

# Save to file
create_data_dir()
data_path = get_data_version_dir()
filenames = get_filenames()
csv_file = os.path.join(data_path, filenames["raw"][location_type])
df.to_csv(csv_file, index=False, encoding="utf-8")


def list_of_dicts_to_columns(row) -> pd.Series:
"""
Expand Down
55 changes: 28 additions & 27 deletions open_mastr/soap_api/parallel.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import datetime
import multiprocessing
import logging
import tqdm

from open_mastr.soap_api.utils import is_time_blacklisted
log = logging.getLogger(__name__)

last_successful_download = datetime.datetime.now()

# Check if worker threads need to be killed


def _stop_execution(time_blacklist, timeout):
# Last successful execution was more than 10 minutes ago. Server seems
# unresponsive, so stop execution permanently by raising an error
if last_successful_download + datetime.timedelta(minutes=timeout) < datetime.datetime.now():
log.error('No response from server in the last {} minutes. Stopping execution.'.format(timeout))
raise ConnectionAbortedError
# Stop execution smoothly if current system time is in blacklist by
# returning. Calling function can decide to continue running later if
# needed
if time_blacklist and is_time_blacklisted(last_successful_download.time()):
log.info('Current time is in blacklist. Halting.')
return True
# ... Add more checks here if needed ...
return False
# import datetime
# import multiprocessing
# import logging
# import tqdm
#
# from open_mastr.soap_api.utils import is_time_blacklisted
# log = logging.getLogger(__name__)
#
# last_successful_download = datetime.datetime.now()
#
# # Check if worker threads need to be killed
#
#
# def _stop_execution(time_blacklist, timeout):
# # Last successful execution was more than 10 minutes ago. Server seems
# # unresponsive, so stop execution permanently by raising an error
# if last_successful_download + datetime.timedelta(minutes=timeout) < datetime.datetime.now():
# log.error('No response from server in the last {} minutes. Stopping execution.'.format(timeout))
# raise ConnectionAbortedError
# # Stop execution smoothly if current system time is in blacklist by
# # returning. Calling function can decide to continue running later if
# # needed
# if time_blacklist and is_time_blacklisted(last_successful_download.time()):
# log.info('Current time is in blacklist. Halting.')
# return True
# # ... Add more checks here if needed ...
# return False
#
Loading