diff --git a/README.rst b/README.rst index fc35b45b..1172c3e9 100755 --- a/README.rst +++ b/README.rst @@ -49,6 +49,12 @@ The NSRDB `Data Model aggregation framework that sources, processes, and prepares data for input to All-Sky. +The MLClouds Model +================== +The `MLClouds Model `_ is used to predict +missing cloud properties (a.k.a. Gap Fill). The NSRDB interface with MLClouds +can be found `here `_. + Installation ============ diff --git a/nsrdb/cli.py b/nsrdb/cli.py index a0115258..692f306b 100755 --- a/nsrdb/cli.py +++ b/nsrdb/cli.py @@ -125,11 +125,16 @@ def main(ctx, config, verbose): To do a standard CONUS / Full Disc run use the following commands:: - $ config='{"year": , "out_dir": }' - $ python -m nsrdb.cli create-configs -c config + $ CONFIG='{"year": , "out_dir": }' + + $ python -m nsrdb.cli create-configs -c ${CONFIG} + $ cd + $ bash run.sh (run this until all main steps are complete) + $ cd post_proc + $ bash run.sh (run this until all post-proc steps are complete) See the help pages of the module CLIs for more details on the config files @@ -248,16 +253,17 @@ def pipeline(ctx, config, cancel, monitor, background, verbose): @click.option( '--run_type', '-r', - default='full', + default='surfrad', type=str, - help="""Run type to create configs for. Can be "full" (generates all config - and pipline files for the given year, including all domain main runs, - blending, aggregation, and collection), or "main" (for standard run without - post-processing, with data-model, ml-cloud-fill, all-sky, and - collect-data-model), "aggregate" (for aggregating post-2018 data to - pre-2018 resolution), "blend" (for blending east and west domains into a - single domain), or "post" (for all blending / aggregation / collection for - a given year)""", + help="""Run type to create configs for. Can be "surfrad" (just writes a + single template config with any provided kwargs replaced, with a surfrad + meta file), "full" (generates all config and pipline files for the given + year, including all domain main runs, blending, aggregation, and + collection), or "main" (for standard run without post-processing, with + data-model, ml-cloud-fill, all-sky, and collect-data-model), "aggregate" + (for aggregating post-2018 data to pre-2018 resolution), "blend" (for + blending east and west domains into a single domain), or "post" (for all + blending / aggregation / collection for a given year)""", ) @click.option( '--all_domains', @@ -277,19 +283,22 @@ def pipeline(ctx, config, cancel, monitor, background, verbose): ) @click.pass_context def create_configs( - ctx, config, run_type='full', all_domains=False, collect=False + ctx, config, run_type='surfrad', all_domains=False, collect=False ): """Create config files for standard NSRDB runs using config templates. - Examples - -------- - $ python -m nsrdb.cli create-configs -c '{"year": 2020, "out_dir": "./"}' + To generate all full_disc / conus run directories for east / + west regions, each with main routine config files contained run the + following:: + + $ CONFIG='{"year": 2020, "out_dir": "./"}' - The above will generate all full_disc / conus run directories for east / - west regions, each with main routine config files contained. Additionally, - conus / full_disc blend configs, aggregation config, collection config, and - a post processing pipeline config with all these steps will be written to a - "post_proc" directory so that post-processing can be run simply with:: + $ python -m nsrdb.cli create-configs --run_type full -c ${CONFIG} + + Additionally, conus / full_disc blend configs, aggregation config, + collection config, and a post processing pipeline config with all these + steps will be written to a "post_proc" directory so that post-processing + can be run simply with:: $ python -m nsrdb.cli pipeline -c config_pipeline_post.json """ @@ -297,7 +306,7 @@ def create_configs( ctx.ensure_object(dict) func_name = f'collect_{run_type}' if collect else run_type func_name = 'main_all' if run_type == 'main' and all_domains else func_name - valid_types = ['full', 'main', 'aggregate', 'blend', 'post'] + valid_types = ['full', 'main', 'aggregate', 'blend', 'post', 'surfrad'] msg = ( f'Received unknown "run_type" {run_type}. Accepted values are ' f'{valid_types}' @@ -436,6 +445,7 @@ def ml_cloud_fill(ctx, config, verbose=False, pipeline_step=None): "col_chunk": 10000, "fill_all": false, "max_workers": 4 + "model_path": ... } } @@ -659,7 +669,7 @@ def collect_data_model(ctx, config, verbose=False, pipeline_step=None): '-c', type=CONFIG_TYPE, required=True, - help='Path to config file or dict with kwargs for NSRDB.all_sky()', + help='Path to config file or dict with kwargs for NSRDB.collect_final()', ) @click.option( '-v', @@ -692,6 +702,34 @@ def collect_final(ctx, config, verbose=False, pipeline_step=None): ) +@main.command() +@click.option( + '--config', + '-c', + type=CONFIG_TYPE, + required=True, + help='Path to config file or dict with kwargs for NSRDB.collect_daily()', +) +@click.option( + '-v', + '--verbose', + is_flag=True, + help='Flag to turn on debug logging. Default is False.', +) +@click.pass_context +def collect_daily(ctx, config, verbose=False, pipeline_step=None): + """Collect daily files into a final file.""" + + BaseCLI.kickoff_single( + ctx=ctx, + module_name=ModuleName.COLLECT_DAILY, + func=Collector.collect_daily, + config=config, + verbose=verbose, + pipeline_step=pipeline_step, + ) + + @main.command() @click.option( '--config', @@ -1045,6 +1083,7 @@ def batch( Pipeline.COMMANDS[ModuleName.AGGREGATE] = aggregate Pipeline.COMMANDS[ModuleName.COLLECT_DATA_MODEL] = collect_data_model Pipeline.COMMANDS[ModuleName.COLLECT_FINAL] = collect_final +Pipeline.COMMANDS[ModuleName.COLLECT_DAILY] = collect_daily Pipeline.COMMANDS[ModuleName.TMY] = tmy Pipeline.COMMANDS[ModuleName.COLLECT_BLEND] = collect_blend Pipeline.COMMANDS[ModuleName.COLLECT_AGGREGATE] = collect_aggregate diff --git a/nsrdb/config/create_configs.py b/nsrdb/config/create_configs.py index 0edd785d..eab7dbfb 100755 --- a/nsrdb/config/create_configs.py +++ b/nsrdb/config/create_configs.py @@ -34,10 +34,12 @@ 'meta_dir': DEFAULT_META_DIR, } -MAIN_KWARGS = { - **BASE_KWARGS, - 'extent': 'full', - 'satellite': 'east', +MAIN_KWARGS = {**BASE_KWARGS, 'extent': 'full', 'satellite': 'east'} + +SURFRAD_KWARGS = { + **MAIN_KWARGS, + 'freq': '15min', + 'spatial': '4km', } BLEND_KWARGS = { @@ -47,10 +49,7 @@ 'main_dir': '../', } -COLLECT_BLEND_KWARGS = { - **BASE_KWARGS, - 'extent': 'full', -} +COLLECT_BLEND_KWARGS = {**BASE_KWARGS, 'extent': 'full'} AGG_KWARGS = { **BASE_KWARGS, @@ -62,7 +61,7 @@ 'conus_freq': '5min', 'final_freq': '30min', 'n_chunks': 32, - 'source_priority': ['conus', 'full_disk'], + 'source_priority': ['conus', 'full_disc'], } COLLECT_AGG_KWARGS = { @@ -102,14 +101,16 @@ class CreateConfigs: standard CONUS / Full Disc runs.""" MAIN_RUN_NAME = '{basename}_{satellite}_{extent}_{year}_{spatial}_{freq}' + SURFRAD_RUN_NAME = '{basename}_{year}_surfrad' BLEND_RUN_NAME = '{basename}_{extent}_{year}_blend' AGG_RUN_NAME = '{basename}_{year}_aggregate' COLLECT_AGG_RUN_NAME = '{basename}_{year}_collect_aggregate' COLLECT_BLEND_RUN_NAME = '{basename}_{extent}_{year}_collect_blend' @classmethod - def _init_kwargs(cls, kwargs, default_kwargs): + def init_kwargs(cls, kwargs=None, default_kwargs=None): """Initialize config with default kwargs.""" + default_kwargs = default_kwargs or {} msg = f'kwargs must have a "year" key. Received {kwargs}.' assert 'year' in kwargs, msg config = copy.deepcopy(default_kwargs) @@ -211,6 +212,7 @@ def _get_run_name(cls, config, run_type='main'): {k: v for k, v in BASE_KWARGS.items() if k not in config} ) pattern_dict = { + 'surfrad': cls.SURFRAD_RUN_NAME, 'main': cls.MAIN_RUN_NAME, 'blend': cls.BLEND_RUN_NAME, 'aggregate': cls.AGG_RUN_NAME, @@ -227,7 +229,7 @@ def _get_run_name(cls, config, run_type='main'): return pattern.format(**run_config) @classmethod - def _update_run_templates(cls, config): + def _update_run_templates(cls, config, run_type='main'): """Replace format keys and dictionary keys in config templates with user input values.""" @@ -236,6 +238,17 @@ def _update_run_templates(cls, config): f'{pprint.pformat(config, indent=2)}' ) + config['doy_range'] = config.get( + 'doy_range', + ([1, 367] if calendar.isleap(config['year']) else [1, 366]), + ) + config['start_doy'], config['end_doy'] = ( + config['doy_range'][0], + config['doy_range'][1], + ) + config['run_name'] = cls._get_run_name(config, run_type=run_type) + config['out_dir'] = os.path.join(config['out_dir'], config['run_name']) + template = ( PRE2018_CONFIG_TEMPLATE if int(config['year']) < 2018 @@ -263,6 +276,22 @@ def _update_run_templates(cls, config): config_dict, cls._get_config_file(config, 'pipeline') ) + run_file = os.path.join(config['out_dir'], 'run.sh') + with open(run_file, 'w') as f: + f.write('python -m nsrdb.cli pipeline -c config_pipeline.json') + + logger.info(f'Saved run script: {run_file}.') + + @classmethod + def surfrad(cls, kwargs): + """Get basic config template specified parameters replaced.""" + config = cls.init_kwargs(kwargs, SURFRAD_KWARGS) + config['extent_tag'] = EXTENT_MAP['extent_tag'][config['extent']] + config['meta_file'] = os.path.join( + config['meta_dir'], 'surfrad_meta.csv' + ) + cls._update_run_templates(config, run_type='surfrad') + @classmethod def main(cls, kwargs): """Modify config files with specified parameters @@ -273,7 +302,7 @@ def main(cls, kwargs): Dictionary of parameters including year, basename, satellite, extent, freq, spatial, meta_file, doy_range """ - config = cls._init_kwargs(kwargs, MAIN_KWARGS) + config = cls.init_kwargs(kwargs, MAIN_KWARGS) msg = ( '"extent" key not provided. Provide "extent" so correct input ' 'data can be selected' @@ -284,27 +313,8 @@ def main(cls, kwargs): config['meta_file'] = cls._get_meta(config) config['spatial'], config['freq'] = cls._get_res(config) - config['doy_range'] = config.get( - 'doy_range', - ([1, 367] if calendar.isleap(config['year']) else [1, 366]), - ) - - config['start_doy'], config['end_doy'] = ( - config['doy_range'][0], - config['doy_range'][1], - ) - - config['run_name'] = cls._get_run_name(config) - config['out_dir'] = os.path.join(config['out_dir'], config['run_name']) - cls._update_run_templates(config) - run_file = os.path.join(config['out_dir'], 'run.sh') - with open(run_file, 'w') as f: - f.write('python -m nsrdb.cli pipeline -c config_pipeline.json') - - logger.info(f'Saved run script: {run_file}.') - @classmethod def main_all(cls, kwargs): """Modify config files for all domains with specified parameters. @@ -463,7 +473,7 @@ def _get_agg_entry(cls, config, extent): @classmethod def _aggregate(cls, kwargs): - """Get config for conus and full disk high-resolution to low-resolution + """Get config for conus and full disc high-resolution to low-resolution aggregation. This is then used as the input to `nsrdb.cli.aggregate` Parameters @@ -472,14 +482,14 @@ def _aggregate(cls, kwargs): Dictionary with keys specifying the case for which to aggregate files """ - config = cls._init_kwargs(kwargs, AGG_KWARGS) + config = cls.init_kwargs(kwargs, AGG_KWARGS) if config['year'] == 2018: data = NSRDB_2018 else: data = { - 'full_disk': cls._get_agg_entry(config, extent='full'), + 'full_disc': cls._get_agg_entry(config, extent='full'), 'conus': cls._get_agg_entry(config, extent='conus'), 'final': cls._get_agg_entry(config, extent='final'), } @@ -490,7 +500,7 @@ def _aggregate(cls, kwargs): @classmethod def aggregate(cls, kwargs): - """Get config for conus and full disk high-resolution to low-resolution + """Get config for conus and full disc high-resolution to low-resolution aggregation. This is then used as the input to `nsrdb.cli.aggregate` Parameters @@ -521,7 +531,7 @@ def _blend(cls, kwargs): Dictionary with keys specifying the case for which to blend data files """ - config = cls._init_kwargs(kwargs, BLEND_KWARGS) + config = cls.init_kwargs(kwargs, BLEND_KWARGS) config['map_col'] = EXTENT_MAP['map_col'][config['extent']] config['lon_seam'] = EXTENT_MAP['lon_seam'][config['extent']] config['meta_file'] = cls._get_meta(config, run_type='blend') @@ -599,7 +609,7 @@ def _collect_blend(cls, kwargs): Dictionary with keys specifying the case for blend collection """ - config = cls._init_kwargs(kwargs, COLLECT_BLEND_KWARGS) + config = cls.init_kwargs(kwargs, COLLECT_BLEND_KWARGS) config['meta_final'] = cls._get_meta(config, run_type='collect-blend') config['collect_dir'] = cls._get_run_name(config, run_type='blend') config['collect_tag'] = config['collect_dir'].replace('_blend', '') @@ -650,7 +660,7 @@ def _collect_aggregate(cls, kwargs): kwargs : dict Dictionary with keys specifying the case for aggregation collection """ - config = cls._init_kwargs(kwargs, COLLECT_AGG_KWARGS) + config = cls.init_kwargs(kwargs, COLLECT_AGG_KWARGS) config['meta_final'] = cls._get_meta( config, run_type='collect-aggregate' diff --git a/nsrdb/config/templates/config_nsrdb_post2017.json b/nsrdb/config/templates/config_nsrdb_post2017.json index 32e2abbe..bac2ffd0 100755 --- a/nsrdb/config/templates/config_nsrdb_post2017.json +++ b/nsrdb/config/templates/config_nsrdb_post2017.json @@ -5,7 +5,7 @@ "max_workers": 10, "n_chunks": 1, "memory": 178, - "n_writes": 50, + "n_writes": 5, "walltime": 48 }, "daily-all-sky": {}, @@ -80,4 +80,4 @@ "fill_all": false, "max_workers": 4 } -} +} \ No newline at end of file diff --git a/nsrdb/config/templates/config_nsrdb_pre2018.json b/nsrdb/config/templates/config_nsrdb_pre2018.json index 34f44051..e76f5e6a 100755 --- a/nsrdb/config/templates/config_nsrdb_pre2018.json +++ b/nsrdb/config/templates/config_nsrdb_pre2018.json @@ -2,7 +2,7 @@ "collect-data-model": { "final": true, "max_workers": 10, - "n_writes": 50, + "n_writes": 5, "n_chunks": 1, "final_file_name": "%basename%_%satellite%_%extent%_%spatial%_%freq%" }, @@ -77,4 +77,4 @@ "fill_all": false, "max_workers": null } -} +} \ No newline at end of file diff --git a/nsrdb/data_model/clouds.py b/nsrdb/data_model/clouds.py index 4fc9941f..aba0be0d 100755 --- a/nsrdb/data_model/clouds.py +++ b/nsrdb/data_model/clouds.py @@ -215,7 +215,7 @@ def correct_coords( "parallax") or B) the sun position so that clouds are mapped to the coordinates they are shading (option == "shading"). """ - + logger.info(f'Correcting cloud coordinates with option = {option}.') shapes = { 'lat': lat.shape, 'lon': lon.shape, @@ -321,7 +321,7 @@ def __str__(self): def get_dset(self, dset): """Abstract placeholder for data retrieval method""" raise NotImplementedError( - 'get_dset() must be defined for H5 or NC ' 'file types.' + 'get_dset() must be defined for H5 or NC file types.' ) @property diff --git a/nsrdb/file_handlers/collection.py b/nsrdb/file_handlers/collection.py index 54d86b67..b057d06c 100755 --- a/nsrdb/file_handlers/collection.py +++ b/nsrdb/file_handlers/collection.py @@ -5,6 +5,7 @@ import logging import os from concurrent.futures import as_completed +from warnings import warn import numpy as np import pandas as pd @@ -207,7 +208,16 @@ def get_slices(final_time_index, final_meta, new_time_index, new_meta): row_slice = slice(np.min(row_loc), np.max(row_loc) + 1) col_slice = slice(np.min(col_loc), np.max(col_loc) + 1) - return row_slice, col_slice + if col_slice.stop - col_slice.start != len(col_loc): + msg = ( + 'Indices for coordinates are not ascending and / or ' + 'contiguous.' + ) + logger.warning(msg) + warn(msg) + col_slice = col_loc + + return row_slice, col_loc @staticmethod def get_data( @@ -664,7 +674,7 @@ def collect_flist_lowmem( def collect_daily( cls, collect_dir, - f_out, + fn_out, dsets, sites=None, n_writes=1, @@ -687,7 +697,7 @@ def collect_daily( collect_dir : str Directory of chunked files. Each file should be one variable for one day. - f_out : str + fn_out : str File path of final output file. dsets : list | str List of datasets / variable names to collect. Can also be a single @@ -720,7 +730,9 @@ def collect_daily( json.loads(dsets) if '[' in dsets and ']' in dsets else [dsets] ) - logger.info('Collecting data from {} to {}'.format(collect_dir, f_out)) + logger.info( + 'Collecting data from {} to {}'.format(collect_dir, fn_out) + ) for i, dset in enumerate(dsets): logger.debug('Collecting dataset "{}".'.format(dset)) @@ -747,11 +759,11 @@ def collect_daily( logger.error(e) raise ValueError(e) - if not os.path.exists(f_out): + if not os.path.exists(fn_out): time_index, meta, _ = collector._get_collection_attrs( collector.flist, collect_dir, sites=sites ) - collector._init_collected_h5(f_out, time_index, meta) + collector._init_collected_h5(fn_out, time_index, meta) flist_chunks = np.array_split( np.array(collector.flist), n_writes @@ -767,7 +779,7 @@ def collect_daily( collector.collect_flist( flist, collect_dir, - f_out, + fn_out, dset, sites=sites, var_meta=var_meta, diff --git a/nsrdb/file_handlers/surfrad.py b/nsrdb/file_handlers/surfrad.py index 40652e76..bda6dd47 100755 --- a/nsrdb/file_handlers/surfrad.py +++ b/nsrdb/file_handlers/surfrad.py @@ -59,8 +59,7 @@ def get_window_size(df, window_minutes=61): ) n_steps = len(np.where(one_hr_mask)[0]) window = int(np.ceil((window_minutes / 60) * n_steps)) - - return window + return np.max((1, window)) @property def native_df(self): diff --git a/nsrdb/gap_fill/mlclouds_fill.py b/nsrdb/gap_fill/mlclouds_fill.py index e371efe8..ae74cfa5 100755 --- a/nsrdb/gap_fill/mlclouds_fill.py +++ b/nsrdb/gap_fill/mlclouds_fill.py @@ -12,7 +12,7 @@ import psutil from farms import ICE_TYPES, WATER_TYPES from mlclouds import MODEL_FPATH -from phygnn import PhygnnModel +from mlclouds.model.multi_step import MultiCloudsModel from rex import MultiFileNSRDB from rex.utilities.execution import SpawnProcessPool @@ -45,11 +45,13 @@ def __init__( fill_all : bool Flag to fill all cloud properties for all timesteps where cloud_type is cloudy. - model_path : str | None - Directory to load phygnn model from. This is typically a fpath to - a .pkl file with an accompanying .json file in the same directory. - None will try to use the default model path from the mlclouds - project directory. + model_path : dict | None + kwargs for ``MultiCloudsModel.load`` method. Specifies + ``cloud_prop_model_path`` for cloud property model and optionally + ``cloud_type_model_path`` for a cloud type model. Each value is + typically a fpath to a .pkl file with an accompanying .json file in + the same directory. None will try to use the default model path + from the mlclouds project directory. var_meta : str | pd.DataFrame | None CSV file or dataframe containing meta data for all NSRDB variables. Defaults to the NSRDB var meta csv in git repo. @@ -76,7 +78,8 @@ def __init__( self._fill_all ) ) - self._phygnn_model = PhygnnModel.load(model_path) + + self._phygnn_model = MultiCloudsModel.load(**model_path) if self.h5_source is not None: with MultiFileNSRDB(self.h5_source) as res: @@ -497,7 +500,7 @@ def archive_cld_properties(self): 'cld_press_acha', 'cloud_type', ] - for dset in cld_dsets: + for dset in [ds for ds in cld_dsets if ds in self.dset_map]: src_fpath = self.dset_map[dset] src_dir, f_name = os.path.split(src_fpath) dst_dir = os.path.join(src_dir, 'raw') @@ -605,16 +608,20 @@ def predict_cld_properties( col_slice ) ) + predict_feats = set(feature_df.columns).intersection( + self.phygnn_model.input_feature_names + ) + predict_feats = feature_df[list(predict_feats)] if not low_mem: - labels = self.phygnn_model.predict(feature_df, table=False) + labels = self.phygnn_model.predict(predict_feats, table=False) else: - len_df = len(feature_df) + len_df = len(predict_feats) chunks = np.array_split( np.arange(len_df), int(np.ceil(len_df / 1000)) ) labels = [] for index_chunk in chunks: - sub = feature_df.iloc[index_chunk] + sub = predict_feats.iloc[index_chunk] labels.append(self.phygnn_model.predict(sub, table=False)) labels = np.concatenate(labels, axis=0) @@ -633,7 +640,7 @@ def predict_cld_properties( shape = feature_data['flag'].shape predicted_data = {} - for i, dset in enumerate(self.phygnn_model.label_names): + for i, dset in enumerate(self.phygnn_model.output_names): logger.debug('Reshaping predicted {} to {}'.format(dset, shape)) predicted_data[dset] = labels[:, i].reshape(shape, order='F') @@ -1177,7 +1184,14 @@ def clean_data_model( return data_model @classmethod - def merra_clouds(cls, h5_source, var_meta=None, merra_fill_flag=8): + def merra_clouds( + cls, + h5_source, + var_meta=None, + merra_fill_flag=8, + fill_all=False, + model_path=None, + ): """Quick check to see if cloud data is from a merra source in which case it should be gap-free and cloud_fill_flag will be written with all 8's @@ -1195,6 +1209,14 @@ def merra_clouds(cls, h5_source, var_meta=None, merra_fill_flag=8): merra_fill_flag : int Integer fill flag representing where merra data was used as source cloud data. + fill_all : bool + Flag to fill all cloud properties for all timesteps where + cloud_type is cloudy. + model_path : str | None + Directory to load phygnn model from. This is typically a fpath to + a .pkl file with an accompanying .json file in the same directory. + None will try to use the default model path from the mlclouds + project directory. Returns ------- @@ -1202,7 +1224,12 @@ def merra_clouds(cls, h5_source, var_meta=None, merra_fill_flag=8): Flag that is True if cloud data is from merra """ - mlclouds = cls(h5_source, var_meta=var_meta) + mlclouds = cls( + h5_source, + var_meta=var_meta, + model_path=model_path, + fill_all=fill_all, + ) with MultiFileNSRDB(h5_source) as res: attrs = res.attrs.get('cld_opd_dcomp', {}) diff --git a/nsrdb/nsrdb.py b/nsrdb/nsrdb.py index e74fc2c1..ac37ea0f 100755 --- a/nsrdb/nsrdb.py +++ b/nsrdb/nsrdb.py @@ -762,10 +762,10 @@ def collect_data_model( @classmethod def collect_final( cls, - collect_dir, out_dir, year, grid, + collect_dir=None, freq='5min', var_meta=None, i_fname=None, @@ -777,8 +777,6 @@ def collect_final( Parameters ---------- - collect_dir : str - Directory with chunked files to be collected. out_dir : str Project directory. year : int | str @@ -786,6 +784,8 @@ def collect_final( grid : str Final/full NSRDB grid file. The first column must be the NSRDB site gid's. + collect_dir : str + Directory with chunked files to be collected. freq : str Final desired NSRDB temporal frequency. var_meta : str | pd.DataFrame | None @@ -804,7 +804,7 @@ def collect_final( Logging level (DEBUG, INFO). If None, no logging will be initialized. """ - + collect_dir = collect_dir or os.path.join(out_dir, 'collect') nsrdb = cls(out_dir, year, grid, freq=freq, var_meta=var_meta) nsrdb._init_loggers(log_file=log_file, log_level=log_level) ti = nsrdb._parse_data_model_output_ti(nsrdb._daily_dir, freq) @@ -979,7 +979,12 @@ def ml_cloud_fill( h5_source = os.path.join(nsrdb._daily_dir, str(date) + '_*.h5') nsrdb._init_loggers(log_file=log_file, log_level=log_level) - is_merra = MLCloudsFill.merra_clouds(h5_source, var_meta=var_meta) + is_merra = MLCloudsFill.merra_clouds( + h5_source, + var_meta=var_meta, + model_path=model_path, + fill_all=fill_all, + ) if not is_merra: MLCloudsFill.run( diff --git a/nsrdb/preprocessing/__init__.py b/nsrdb/preprocessing/__init__.py new file mode 100644 index 00000000..7e36d39d --- /dev/null +++ b/nsrdb/preprocessing/__init__.py @@ -0,0 +1,2 @@ +"""Miscellaneous preprocessing utilities. e.g. Converting NASA Polar data to +standard format.""" diff --git a/nsrdb/preprocessing/nasa_data_model.py b/nsrdb/preprocessing/nasa_data_model.py new file mode 100644 index 00000000..2b508381 --- /dev/null +++ b/nsrdb/preprocessing/nasa_data_model.py @@ -0,0 +1,286 @@ +"""Convert NASA data to UWISC format.""" + +import argparse +import logging +import os +import re +from functools import cached_property +from glob import glob + +import dask +import numpy as np +import pandas as pd +import xarray as xr +from rex import init_logger + +init_logger('nsrdb', log_level='DEBUG') +init_logger(__name__, log_level='DEBUG') + +logger = logging.getLogger(__name__) + +DROP_VARS = ['relative_time'] + +NAME_MAP = { + 'BT_3.75um': 'temp_3_75um_nom', + 'BT_10.8um': 'temp_11_0um_nom', + 'ref_0.63um': 'refl_0_65um_nom', + 'cloud_optical_depth': 'cld_opd_dcomp', + 'cloud_eff_particle_size': 'cld_reff_dcomp', + 'cloud_eff_pressure': 'cld_press_acha', + 'cloud_eff_height': 'cld_height_acha', + 'cloud_phase': 'cloud_type', + 'solar_zenith': 'solar_zenith_angle', + 'view_zenith': 'sensor_zenith_angle', + 'relative_azimuth': 'solar_azimuth_angle', +} + +UWISC_CLOUD_TYPE = { + 'N/A': -15, + 'Clear': 0, + 'Probably Clear': 1, + 'Fog': 2, + 'Water': 3, + 'Super-Cooled Water': 4, + 'Mixed': 5, + 'Opaque Ice': 6, + 'Cirrus': 7, + 'Overlapping': 8, + 'Overshooting': 9, + 'Unknown': 10, + 'Dust': 11, + 'Smoke': 12, +} + +NASA_CLOUD_TYPE = { + 'Clear sky snow/ice': 0, + 'Water cloud': 1, + 'Ice cloud': 2, + 'No cloud property retrievals': 3, + 'Clear sky land/water': 4, + 'Bad input data': 5, + 'Possible water cloud': 6, + 'Possible ice cloud': 7, + 'Cleaned data': 13, +} + +CLOUD_TYPE_MAP = { + 0: 'Clear', + 1: 'Water', + 2: 'Opaque Ice', + 3: 'Unknown', + 4: 'Clear', + 5: 'N/A', + 6: 'Water', + 7: 'Opaque Ice', + 13: 'Unknown', +} + + +class NasaDataModel: + """Class to handle conversion of nasa data to standard uwisc style format + for NSRDB pipeline""" + + def __init__(self, input_file, output_pattern): + """ + Parameters + ---------- + input_file : str + e.g. "./2017/01/01/nacomposite_2017.001.0000.nc" + output_pattern : str + Needs to include year, doy, and timestamp format keys. + e.g. "./{year}/{doy}/nacomposite_{timestamp}.nc" + """ + self.input_file = input_file + self.output_pattern = output_pattern + + @cached_property + def timestamp(self): + """Get year, doy, hour from input file name. + + TODO: Should get this from relative_time variables to be more precise + """ + match_pattern = r'.*_([0-9]+).([0-9]+).([0-9]+).\w+' + ts = re.match(match_pattern, self.input_file).groups() + year, doy, hour = ts + secs = '000' + return year, doy, hour, secs + + @cached_property + def output_file(self): + """Get output file name for given output pattern.""" + year, doy, _, _ = self.timestamp + return self.output_pattern.format( + year=year, doy=doy, timestamp=f's{"".join(self.timestamp)}' + ) + + @cached_property + def ds(self): + """Get xarray dataset for raw input file""" + return xr.open_mfdataset( + self.input_file, + **{'group': 'map_data', 'decode_times': False}, + format='NETCDF4', + engine='h5netcdf', + ) + + @classmethod + def rename_vars(cls, ds): + """Rename variables to uwisc conventions""" + for k, v in NAME_MAP.items(): + if k in ds.data_vars: + ds = ds.rename({k: v}) + return ds + + @classmethod + def drop_vars(cls, ds): + """Drop list of variables""" + for v in DROP_VARS: + if v in ds.data_vars: + ds = ds.drop_vars(v) + return ds + + @classmethod + def remap_dims(cls, ds): + """Rename dims and coords to standards. Make lat / lon into 2d arrays, + as expected by cloud regridding routine.""" + + sdims = ('south_north', 'west_east') + for var in ds.data_vars: + single_ts = ( + 'time' in ds[var].dims + and ds[var].transpose('time', ...).shape[0] == 1 + ) + if single_ts and var != 'reference_time': + ds[var] = (sdims, ds[var].isel(time=0).data) + + ref_time = ds.attrs.get('reference_time', None) + if ref_time is not None: + ti = pd.DatetimeIndex([ref_time]).values + ds = ds.assign_coords({'time': ('time', ti)}) + if 'Lines' in ds.dims: + ds = ds.swap_dims({'Lines': 'south_north', 'Pixels': 'west_east'}) + if 'lat' in ds.coords: + ds = ds.rename({'lat': 'latitude', 'lon': 'longitude'}) + + ds['south_north'] = ds['latitude'] + ds['west_east'] = ds['longitude'] + + lons, lats = np.meshgrid(ds['longitude'], ds['latitude']) + ds = ds.assign_coords( + {'latitude': (sdims, lats), 'longitude': (sdims, lons)} + ) + return ds + + @classmethod + def remap_cloud_phase(cls, ds): + """Map nasa cloud phase flags to uwisc values.""" + ct_name = NAME_MAP['cloud_phase'] + cloud_type = ds[ct_name].values.copy() + for val, cs_type in CLOUD_TYPE_MAP.items(): + cloud_type = np.where( + ds[ct_name].values.astype(int) == int(val), + UWISC_CLOUD_TYPE[cs_type], + cloud_type, + ) + ds[ct_name] = (ds[ct_name].dims, cloud_type) + return ds + + @classmethod + def derive_stdevs(cls, ds): + """Derive standard deviations of some variables, which are used as + training features.""" + for var_name in ('refl_0_65um_nom', 'temp_11_0um_nom'): + stddev = ( + ds[var_name] + .rolling( + south_north=3, west_east=3, center=True, min_periods=1 + ) + .std() + ) + ds[f'{var_name}_stddev_3x3'] = stddev + return ds + + @classmethod + def write_output(cls, ds, output_file): + """Write converted dataset to output_file.""" + os.makedirs(os.path.dirname(output_file), exist_ok=True) + ds = ds.transpose('south_north', 'west_east', ...) + ds.load().to_netcdf(output_file, format='NETCDF4', engine='h5netcdf') + + @classmethod + def run(cls, input_file, output_pattern): + """Run conversion routine and write converted dataset.""" + dm = cls(input_file, output_pattern) + + if os.path.exists(dm.output_file): + logger.info( + '%s already exists. Skipping conversion.', dm.output_file + ) + else: + logger.info('Geting xarray dataset for %s', input_file) + ds = dm.ds + + logger.info('Remapping dimensions.') + ds = dm.remap_dims(ds) + + logger.info('Renaming variables.') + ds = dm.rename_vars(ds) + + logger.info('Dropping some variables.') + ds = dm.drop_vars(ds) + + logger.info('Remapping cloud type values.') + ds = dm.remap_cloud_phase(ds) + + logger.info('Deriving some stddev variables.') + ds = dm.derive_stdevs(ds) + + logger.info('Writing converted file to %s', dm.output_file) + dm.write_output(ds, dm.output_file) + + +def run_jobs(input_pattern, output_pattern, max_workers=None): + """Run multiple file conversion jobs""" + + files = glob(input_pattern) + + tasks = [ + dask.delayed(NasaDataModel.run)( + input_file=file, output_pattern=output_pattern + ) + for file in files + ] + + if max_workers == 1: + _ = dask.compute(*tasks, scheduler='single-threaded') + else: + _ = dask.compute(*tasks, scheduler='threads', num_workers=max_workers) + + logger.info('Finished converting %s files.', len(files)) + + +if __name__ == '__main__': + default_output_pattern = '/projects/pxs/nasa_polar/standardized/{year}' + default_output_pattern += '/{doy}/nacomposite_{timestamp}.nc' + parser = argparse.ArgumentParser() + parser.add_argument( + 'input_pattern', type=str, help='File pattern for input_files.' + ) + parser.add_argument( + '-output_pattern', + type=str, + default=default_output_pattern, + help='File pattern for output files.', + ) + parser.add_argument( + '-max_workers', + type=int, + default=10, + help='Number of workers to use for parallel file conversion', + ) + args = parser.parse_args() + run_jobs( + input_pattern=args.input_pattern, + output_pattern=args.output_pattern, + max_workers=args.max_workers, + ) diff --git a/nsrdb/utilities/__init__.py b/nsrdb/utilities/__init__.py index 3b7ac8e0..544d5ac8 100755 --- a/nsrdb/utilities/__init__.py +++ b/nsrdb/utilities/__init__.py @@ -21,6 +21,7 @@ class ModuleName(str, Enum): COLLECT_AGGREGATE = 'collect-aggregate' COLLECT_DATA_MODEL = 'collect-data-model' COLLECT_FINAL = 'collect-final' + COLLECT_DAILY = 'collect-daily' TMY = 'tmy' COLLECT_TMY = 'collect-tmy' diff --git a/nsrdb/utilities/cli.py b/nsrdb/utilities/cli.py index d51f9ad7..77353314 100644 --- a/nsrdb/utilities/cli.py +++ b/nsrdb/utilities/cli.py @@ -440,7 +440,6 @@ def kickoff_job(cls, ctx, module_name, func, config, log_id=None): f'{fun_str};\n' 't_elap = time.time() - t0;\n' ) - cmd += cls.get_status_cmd(config, pipeline_step) ctx.obj['JOB_NAME'] = config['job_name'] @@ -534,7 +533,6 @@ def kickoff_multiday( config_dict['date'] = date config_dict['job_name'] = f'{ctx.obj["RUN_NAME"]}_{log_id}' config_dict['doy'] = doy - cls.kickoff_job( ctx, module_name=module_name, diff --git a/nsrdb/utilities/sky_class.py b/nsrdb/utilities/sky_class.py index 941e00ee..26805ca0 100755 --- a/nsrdb/utilities/sky_class.py +++ b/nsrdb/utilities/sky_class.py @@ -108,7 +108,11 @@ def __init__( self._sza_lim = sza_lim self._handle_surf = Surfrad(self._fp_surf) - Handler = MultiFileResource if '*' in self._fp_nsrdb else Resource + Handler = ( + MultiFileResource + if '*' in self._fp_nsrdb or isinstance(self._fp_nsrdb, list) + else Resource + ) self._handle_nsrdb = Handler(self._fp_nsrdb) def __enter__(self): @@ -298,10 +302,13 @@ def add_validation_data(self, df): """Add NSRDB and SURFRAD ghi and dni data to a DataFrame.""" df = df.reindex(self.nsrdb_time_index) assert len(df) == len(self.nsrdb_time_index) - ti_deltas = self.nsrdb_time_index - np.roll(self.nsrdb_time_index, 1) + ti_deltas = ( + self.nsrdb_time_index.values[1:] + - self.nsrdb_time_index.values[:-1] + ) ti_deltas_minutes = pd.Series(ti_deltas).dt.seconds / 60 ti_delta_minutes = int(mode(ti_deltas_minutes)[0]) - freq = '{}T'.format(ti_delta_minutes) + freq = '{}min'.format(ti_delta_minutes) df = df.drop(['ghi_ground', 'clear'], axis=1) surf_df = self.surfrad.get_df( dt_out=freq, window_minutes=self._window_min diff --git a/pyproject.toml b/pyproject.toml index 017244b6..6b73a61a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ dependencies = [ "NREL-farms>=1.0.6", "scikit-learn>=1.0", "NREL-rest2>=1.0.2", - "NREL-mlclouds>=0.0.1", + "NREL-mlclouds>=0.0.5", + "dask>=2024.8.0" ] [project.optional-dependencies]