Source code for geoglows.bias

import math
import warnings

import hydrostats as hs
import hydrostats.data as hd
import numpy as np
import pandas as pd
from scipy import interpolate

from .analyze import fdc
from .data import retrospective
from .data import sfdc_for_river_id

__all__ = [
    'correct_historical',
    'correct_forecast',
    'statistics_tables',
    'sfdc_bias_correction',
]


[docs] def correct_historical(simulated_data: pd.DataFrame, observed_data: pd.DataFrame) -> pd.DataFrame: """ Accepts a historically simulated flow timeseries and observed flow timeseries and attempts to correct biases in the simulation on a monthly basis. Args: simulated_data: A dataframe with a datetime index and a single column of streamflow values observed_data: A dataframe with a datetime index and a single column of streamflow values Returns: pandas DataFrame with a datetime index and a single column of streamflow values """ # list of the unique months in the historical simulation. should always be 1->12 but just in case... unique_simulation_months = sorted(set(simulated_data.index.strftime('%m'))) dates = [] values = [] for month in unique_simulation_months: # filter historic data to only be current month monthly_simulated = simulated_data[simulated_data.index.month == int(month)].dropna() to_prob = _flow_and_probability_mapper(monthly_simulated, to_probability=True) # filter the observations to current month monthly_observed = observed_data[observed_data.index.month == int(month)].dropna() to_flow = _flow_and_probability_mapper(monthly_observed, to_flow=True) dates += monthly_simulated.index.to_list() value = to_flow(to_prob(monthly_simulated.values)) values += value.tolist() corrected = pd.DataFrame(data=values, index=dates, columns=['Corrected Simulated Streamflow']) corrected.sort_index(inplace=True) return corrected
def sfdc_bias_correction(river_id: int) -> pd.DataFrame: """ Corrects the bias in the simulated data for a given river_id using the SFDC method. This method is based on the https://github.com/rileyhales/saber-hbc repository. Args: river_id: int: a 9 digit integer that is a valid GEOGLOWS River ID number Returns: pandas DataFrame with a DateTime index and columns with Simulated flow, Bias Corrected Simulation flow. """ assert isinstance(river_id, int), 'river_id must be an integer' sim_data = retrospective(river_id=river_id) sfdc_b = sfdc_for_river_id(river_id=river_id) sim_fdc_b = [] for i in range(1, 13): mdf = fdc(sim_data[sim_data.index.month == i].values.flatten()).rename(columns={'Q': 'fdc'}).reset_index() mdf['month'] = i sim_fdc_b.append(mdf) sim_fdc_b = pd.concat(sim_fdc_b, axis=0) # Process each month from January (1) to December (12) monthly_results = [] for month in sorted(set(sim_data.index.month)): mon_sim_b = sim_data[sim_data.index.month == month].dropna().clip(lower=0) qb_original = mon_sim_b[river_id].values.flatten() scalar_fdc = sfdc_b[sfdc_b['month'] == month][['p_exceed', 'sfdc']].set_index('p_exceed') sim_fdc_b_m = sim_fdc_b[sim_fdc_b['month'] == month][['p_exceed', 'fdc']].set_index('p_exceed') flow_to_percent = _make_interpolator(sim_fdc_b_m.values.flatten(), sim_fdc_b_m.index, extrap='nearest', fill_value=None) percent_to_scalar = _make_interpolator(scalar_fdc.index, scalar_fdc.values.flatten(), extrap='nearest', fill_value=None) p_exceed = flow_to_percent(qb_original) scalars = percent_to_scalar(p_exceed) qb_adjusted = qb_original / scalars # Create a DataFrame for this month's results month_df = pd.DataFrame({ 'date': mon_sim_b.index, 'Simulated': qb_original, 'Bias Corrected Simulation': qb_adjusted }) # Append the month's DataFrame to the results list monthly_results.append(month_df) return pd.concat(monthly_results).set_index('date').sort_index() def _make_interpolator(x: np.array, y: np.array, extrap: str = 'nearest', fill_value: int or float = None) -> interpolate.interp1d: """ Make an interpolator from two arrays Args: x: x values y: y values extrap: method for extrapolation: nearest, const, linear, average, max, min fill_value: value to use when extrap='const' Returns: interpolate.interp1d """ # todo check that flows are not negative and have sufficient variance - even for small variance in SAF # if np.max(y) - np.min(y) < 5: # logger.warning('The y data has similar max/min values. You may get unanticipated results.') # make interpolator which converts the percentiles to scalars if extrap == 'nearest': return interpolate.interp1d(x, y, fill_value='extrapolate', kind='nearest') elif extrap == 'const': if fill_value is None: raise ValueError('Must provide the const kwarg when extrap_method="const"') return interpolate.interp1d(x, y, fill_value=fill_value, bounds_error=False) elif extrap == 'linear': return interpolate.interp1d(x, y, fill_value='extrapolate') elif extrap == 'average': return interpolate.interp1d(x, y, fill_value=np.mean(y), bounds_error=False) elif extrap == 'max' or extrap == 'maximum': return interpolate.interp1d(x, y, fill_value=np.max(y), bounds_error=False) elif extrap == 'min' or extrap == 'minimum': return interpolate.interp1d(x, y, fill_value=np.min(y), bounds_error=False) else: raise ValueError('Invalid extrapolation method provided')
[docs] def correct_forecast(forecast_data: pd.DataFrame, simulated_data: pd.DataFrame, observed_data: pd.DataFrame, use_month: int = 0) -> pd.DataFrame: """ Accepts a short term forecast of streamflow, simulated historical flow, and observed flow timeseries and attempts to correct biases in the forecasted data Args: forecast_data: A dataframe with a datetime index and any number of columns of forecasted flow. Compatible with forecast_stats, forecast_ensembles, forecast_records simulated_data: A dataframe with a datetime index and a single column of streamflow values observed_data: A dataframe with a datetime index and a single column of streamflow values use_month: Optional: either 0 for correct the forecast based on the first month of the forecast data or -1 if you want to correct based on the ending month of the forecast data Returns: pandas DataFrame with a copy of forecasted data with values updated in each column """ # make a copy of the forecasts which we update and return so the original data is not changed forecast_copy = forecast_data.copy() # make the flow and probability interpolation functions monthly_simulated = simulated_data[simulated_data.index.month == forecast_copy.index[use_month].month].dropna() monthly_observed = observed_data[observed_data.index.month == forecast_copy.index[use_month].month].dropna() to_prob = _flow_and_probability_mapper(monthly_simulated, to_probability=True, extrapolate=True) to_flow = _flow_and_probability_mapper(monthly_observed, to_flow=True, extrapolate=True) # for each column of forecast data, make the interpolation function and update the dataframe for column in forecast_copy.columns: tmp = forecast_copy[column].dropna() forecast_copy.update(pd.DataFrame(to_flow(to_prob(tmp.values)), index=tmp.index, columns=[column])) return forecast_copy
[docs] def statistics_tables(corrected: pd.DataFrame, simulated: pd.DataFrame, observed: pd.DataFrame, merged_sim_obs: pd.DataFrame = False, merged_cor_obs: pd.DataFrame = False, metrics: list = None) -> str: """ Makes an html table of various statistical metrics for corrected vs observed data alongside the same metrics for the simulated vs observed data as a way to see the improvement made by the bias correction. This function uses hydrostats.data.merge_data on the 3 inputs. If you have already computed these because you are doing a full comparison of bias correction, you can provide them to save time Args: corrected: A dataframe with a datetime index and a single column of streamflow values simulated: A dataframe with a datetime index and a single column of streamflow values observed: A dataframe with a datetime index and a single column of streamflow values merged_sim_obs: (optional) if you have already computed it, hydrostats.data.merge_data(simulated, observed) merged_cor_obs: (optional) if you have already computed it, hydrostats.data.merge_data(corrected, observed) metrics: A list of abbreviated statistic names. See the documentation for HydroErr """ if corrected is False and simulated is False and observed is False: if merged_sim_obs is not False and merged_cor_obs is not False: pass # if you provided the merged dataframes already, we use those else: # merge the datasets together merged_sim_obs = hd.merge_data(sim_df=simulated, obs_df=observed) merged_cor_obs = hd.merge_data(sim_df=corrected, obs_df=observed) if metrics is None: metrics = ['ME', 'RMSE', 'NRMSE (Mean)', 'MAPE', 'NSE', 'KGE (2009)', 'KGE (2012)'] # Merge Data table1 = hs.make_table(merged_dataframe=merged_sim_obs, metrics=metrics) table2 = hs.make_table(merged_dataframe=merged_cor_obs, metrics=metrics) table2 = table2.rename(index={'Full Time Series': 'Corrected Full Time Series'}) table1 = table1.rename(index={'Full Time Series': 'Original Full Time Series'}) table1 = table1.transpose() table2 = table2.transpose() table_final = pd.merge(table1, table2, right_index=True, left_index=True) return table_final.to_html()
def _flow_and_probability_mapper(monthly_data: pd.DataFrame, to_probability: bool = False, to_flow: bool = False, extrapolate: bool = False) -> interpolate.interp1d: if not to_flow and not to_probability: raise ValueError('You need to specify either to_probability or to_flow as True') # get maximum value to bound histogram max_val = math.ceil(np.max(monthly_data.max())) min_val = math.floor(np.min(monthly_data.min())) if max_val == min_val: warnings.warn('The observational data has the same max and min value. You may get unanticipated results.') max_val += .1 # determine number of histograms bins needed number_of_points = len(monthly_data.values) number_of_classes = math.ceil(1 + (3.322 * math.log10(number_of_points))) # specify the bin width for histogram (in m3/s) step_width = (max_val - min_val) / number_of_classes # specify histogram bins bins = np.arange(-np.min(step_width), max_val + 2 * np.min(step_width), np.min(step_width)) if bins[0] == 0: bins = np.concatenate((-bins[1], bins)) elif bins[0] > 0: bins = np.concatenate((-bins[0], bins)) # make the histogram counts, bin_edges = np.histogram(monthly_data, bins=bins) # adjust the bins to be the center bin_edges = bin_edges[1:] # normalize the histograms counts = counts.astype(float) / monthly_data.size # calculate the cdfs cdf = np.cumsum(counts) # interpolated function to convert simulated streamflow to prob if to_probability: if extrapolate: func = interpolate.interp1d(bin_edges, cdf, fill_value='extrapolate') else: func = interpolate.interp1d(bin_edges, cdf) return lambda x: np.clip(func(x), 0, 1) # interpolated function to convert simulated prob to observed streamflow elif to_flow: if extrapolate: return interpolate.interp1d(cdf, bin_edges, fill_value='extrapolate') return interpolate.interp1d(cdf, bin_edges)