import math
import warnings
import hydrostats as hs
import hydrostats.data as hd
import numpy as np
import pandas as pd
from scipy import interpolate
from scipy import stats
from .data import sfdc_for_river_id
from .data import retrospective
__all__ = [
'correct_historical',
'correct_forecast',
'statistics_tables',
]
[docs]
def correct_historical(simulated_data: pd.DataFrame, observed_data: pd.DataFrame) -> pd.DataFrame:
"""
Accepts a historically simulated flow timeseries and observed flow timeseries and attempts to correct biases in the
simulation on a monthly basis.
Args:
simulated_data: A dataframe with a datetime index and a single column of streamflow values
observed_data: A dataframe with a datetime index and a single column of streamflow values
Returns:
pandas DataFrame with a datetime index and a single column of streamflow values
"""
# list of the unique months in the historical simulation. should always be 1->12 but just in case...
unique_simulation_months = sorted(set(simulated_data.index.strftime('%m')))
dates = []
values = []
for month in unique_simulation_months:
# filter historic data to only be current month
monthly_simulated = simulated_data[simulated_data.index.month == int(month)].dropna()
to_prob = _flow_and_probability_mapper(monthly_simulated, to_probability=True)
# filter the observations to current month
monthly_observed = observed_data[observed_data.index.month == int(month)].dropna()
to_flow = _flow_and_probability_mapper(monthly_observed, to_flow=True)
dates += monthly_simulated.index.to_list()
value = to_flow(to_prob(monthly_simulated.values))
values += value.tolist()
corrected = pd.DataFrame(data=values, index=dates, columns=['Corrected Simulated Streamflow'])
corrected.sort_index(inplace=True)
return corrected
def sfdc_bias_correction(river_id: int) -> pd.DataFrame:
"""
Corrects the bias in the simulated data for a given river_id using the SFDC method. This method is based on the
https://github.com/rileyhales/saber-hbc repository.
Args: river_id: int: a 9 digit integer that is a valid GEOGLOWS River ID number
Returns: pandas DataFrame with a DateTime index and columns with Simulated flow, Bias Corrected Simulation flow.
"""
simulated_data = retrospective(river_id=river_id)
print(simulated_data)
sfdc_b = sfdc_for_river_id(river_id=river_id)
sim_fdc_b = []
for i in range(1, 13):
mdf = fdc(simulated_data[simulated_data.index.month == i].values.flatten()).rename(columns={'Q': 'fdc'}).reset_index()
mdf['month'] = i
sim_fdc_b.append(mdf)
sim_fdc_b = pd.concat(sim_fdc_b, axis=0)
# Process each month from January (1) to December (12)
monthly_results = []
for month in sorted(set(simulated_data.index.month)):
mon_sim_b = simulated_data[simulated_data.index.month == month].dropna().clip(lower=0)
qb_original = mon_sim_b[river_id].values.flatten()
scalar_fdc = sfdc_b[sfdc_b['month'] == month][['p_exceed', 'sfdc']].set_index('p_exceed')
sim_fdc_b_m = sim_fdc_b[sim_fdc_b['month'] == month][['p_exceed', 'fdc']].set_index('p_exceed')
flow_to_percent = _make_interpolator(sim_fdc_b_m.values.flatten(),
sim_fdc_b_m.index,
extrap='nearest',
fill_value=None)
percent_to_scalar = _make_interpolator(scalar_fdc.index,
scalar_fdc.values.flatten(),
extrap='nearest',
fill_value=None)
p_exceed = flow_to_percent(qb_original)
scalars = percent_to_scalar(p_exceed)
qb_adjusted = qb_original / scalars
# Create a DataFrame for this month's results
month_df = pd.DataFrame({
'date': mon_sim_b.index,
'Simulated': qb_original,
'Bias Corrected Simulation': qb_adjusted
})
# Append the month's DataFrame to the results list
monthly_results.append(month_df)
return pd.concat(monthly_results).set_index('date').sort_index()
def make_sfdc(simulated_df: pd.DataFrame, gauge_df: pd.DataFrame,
use_log: bool = False, fix_seasonally: bool = True, empty_months: str = 'skip',
drop_outliers: bool = False, outlier_threshold: int or float = 2.5,
filter_scalar_fdc: bool = False, filter_range: tuple = (0, 80),
extrapolate: str = 'nearest', fill_value: int or float = None,
fit_gumbel: bool = False, fit_range: tuple = (10, 90),
metadata: bool = False, ) -> pd.DataFrame:
"""
Makes a scalar flow duration curve (SFDC) mapping from simulated to observed (gauge) flow data. SFDC is used in SABER to
correct timeseries data for a ungauged riverid.
Args:
simulated_data: A dataframe with a datetime index and a single column of simulated streamflow values
gauge_df: A dataframe with a datetime index and a single column of observed streamflow values
extrapolate: Method to use for extrapolation. Options: nearest, const, linear, average, max, min
fill_value: Value to use for extrapolation when extrapolate_method='const'
filter_range: Lower and upper bounds of the filter range
drop_outliers: Flag to exclude outliers
outlier_threshold: Number of std deviations from mean to exclude from flow duration curve
Returns:
pandas DataFrame with a DateTime index and columns with corrected flow, uncorrected flow, the scalar adjustment
factor applied to correct the discharge, and the percentile of the uncorrected flow (in the seasonal grouping,
if applicable).
"""
if fix_seasonally:
# list of the unique months in the historical simulation. should always be 1->12 but just in case...
monthly_results = []
for month in sorted(set(simulated_df.index.strftime('%m'))):
# filter data to current iteration's month
mon_obs_a = gauge_df[gauge_df.index.month == int(month)].dropna().clip(lower=0)
if mon_obs_a.empty:
if empty_months == 'skip':
continue
else:
raise ValueError(f'Invalid value for argument "empty_months". Given: {empty_months}.')
mon_sim_b = simulated_df[simulated_df.index.month == int(month)].dropna().clip(lower=0)
monthly_results.append(make_sfdc(
mon_obs_a, mon_sim_b,
fix_seasonally=False, empty_months=empty_months,
drop_outliers=drop_outliers, outlier_threshold=outlier_threshold,
filter_scalar_fdc=filter_scalar_fdc, filter_range=filter_range,
extrapolate=extrapolate, fill_value=fill_value,
fit_gumbel=fit_gumbel, fit_range=fit_range, )
)
# combine the results from each monthly into a single dataframe (sorted chronologically) and return it
return pd.concat(monthly_results).sort_index()
if drop_outliers:
simulated_df = _drop_outliers_by_zscore(simulated_df, threshold=outlier_threshold)
gauge_df = _drop_outliers_by_zscore(gauge_df, threshold=outlier_threshold)
# compute the flow duration curves
sim_fdc = fdc(simulated_df)
obs_fdc = fdc(gauge_df)
# calculate the scalar flow duration curve
scalar_fdc = sfdc(sim_fdc, obs_fdc)
return scalar_fdc
def fdc(flows: np.array, steps: int = 101, col_name: str = 'Q') -> pd.DataFrame:
"""
Compute flow duration curve (exceedance probabilities) from a list of flows
Args:
flows: array of flows
steps: number of steps (exceedance probabilities) to use in the FDC
col_name: name of the column in the returned dataframe
Returns:
pd.DataFrame with index 'p_exceed' and columns 'Q' (or col_name)
"""
# calculate the FDC and save to parquet
exceed_prob = np.linspace(100, 0, steps)
fdc_flows = np.nanpercentile(flows, exceed_prob)
df = pd.DataFrame(fdc_flows, columns=[col_name, ], index=exceed_prob)
df.index.name = 'p_exceed'
return df
def _drop_outliers_by_zscore(df: pd.DataFrame, threshold: float = 3) -> pd.DataFrame:
"""
Drop outliers from a dataframe by their z-score and a threshold
Based on https://stackoverflow.com/questions/23199796/detect-and-exclude-outliers-in-pandas-data-frame
Args:
df: dataframe to drop outliers from
threshold: z-score threshold
Returns:
pd.DataFrame with outliers removed
"""
return df[(np.abs(stats.zscore(df)) < threshold).all(axis=1)]
def sfdc(sim_fdc: pd.DataFrame, obs_fdc: pd.DataFrame) -> pd.DataFrame:
"""
Compute the scalar flow duration curve (exceedance probabilities) from two flow duration curves
Args:
sim_fdc: simulated flow duration curve
obs_fdc: observed flow duration curve
Returns:
pd.DataFrame with index (exceedance probabilities) and a column of scalars
"""
scalars_df = pd.DataFrame(np.divide(sim_fdc.values.flatten(), obs_fdc.values.flatten()),
columns=['scalars', ],
index=sim_fdc.index
)
scalars_df.replace(np.inf, np.nan, inplace=True)
scalars_df.dropna(inplace=True)
return scalars_df
def _make_interpolator(x: np.array, y: np.array, extrap: str = 'nearest',
fill_value: int or float = None) -> interpolate.interp1d:
"""
Make an interpolator from two arrays
Args:
x: x values
y: y values
extrap: method for extrapolation: nearest, const, linear, average, max, min
fill_value: value to use when extrap='const'
Returns:
interpolate.interp1d
"""
# todo check that flows are not negative and have sufficient variance - even for small variance in SAF
# if np.max(y) - np.min(y) < 5:
# logger.warning('The y data has similar max/min values. You may get unanticipated results.')
# make interpolator which converts the percentiles to scalars
if extrap == 'nearest':
return interpolate.interp1d(x, y, fill_value='extrapolate', kind='nearest')
elif extrap == 'const':
if fill_value is None:
raise ValueError('Must provide the const kwarg when extrap_method="const"')
return interpolate.interp1d(x, y, fill_value=fill_value, bounds_error=False)
elif extrap == 'linear':
return interpolate.interp1d(x, y, fill_value='extrapolate')
elif extrap == 'average':
return interpolate.interp1d(x, y, fill_value=np.mean(y), bounds_error=False)
elif extrap == 'max' or extrap == 'maximum':
return interpolate.interp1d(x, y, fill_value=np.max(y), bounds_error=False)
elif extrap == 'min' or extrap == 'minimum':
return interpolate.interp1d(x, y, fill_value=np.min(y), bounds_error=False)
else:
raise ValueError('Invalid extrapolation method provided')
[docs]
def correct_forecast(forecast_data: pd.DataFrame, simulated_data: pd.DataFrame,
observed_data: pd.DataFrame, use_month: int = 0) -> pd.DataFrame:
"""
Accepts a short term forecast of streamflow, simulated historical flow, and observed flow timeseries and attempts
to correct biases in the forecasted data
Args:
forecast_data: A dataframe with a datetime index and any number of columns of forecasted flow. Compatible with
forecast_stats, forecast_ensembles, forecast_records
simulated_data: A dataframe with a datetime index and a single column of streamflow values
observed_data: A dataframe with a datetime index and a single column of streamflow values
use_month: Optional: either 0 for correct the forecast based on the first month of the forecast data or -1 if
you want to correct based on the ending month of the forecast data
Returns:
pandas DataFrame with a copy of forecasted data with values updated in each column
"""
# make a copy of the forecasts which we update and return so the original data is not changed
forecast_copy = forecast_data.copy()
# make the flow and probability interpolation functions
monthly_simulated = simulated_data[simulated_data.index.month == forecast_copy.index[use_month].month].dropna()
monthly_observed = observed_data[observed_data.index.month == forecast_copy.index[use_month].month].dropna()
to_prob = _flow_and_probability_mapper(monthly_simulated, to_probability=True, extrapolate=True)
to_flow = _flow_and_probability_mapper(monthly_observed, to_flow=True, extrapolate=True)
# for each column of forecast data, make the interpolation function and update the dataframe
for column in forecast_copy.columns:
tmp = forecast_copy[column].dropna()
forecast_copy.update(pd.DataFrame(to_flow(to_prob(tmp.values)), index=tmp.index, columns=[column]))
return forecast_copy
[docs]
def statistics_tables(corrected: pd.DataFrame, simulated: pd.DataFrame, observed: pd.DataFrame,
merged_sim_obs: pd.DataFrame = False, merged_cor_obs: pd.DataFrame = False,
metrics: list = None) -> str:
"""
Makes an html table of various statistical metrics for corrected vs observed data alongside the same metrics for
the simulated vs observed data as a way to see the improvement made by the bias correction. This function uses
hydrostats.data.merge_data on the 3 inputs. If you have already computed these because you are doing a full
comparison of bias correction, you can provide them to save time
Args:
corrected: A dataframe with a datetime index and a single column of streamflow values
simulated: A dataframe with a datetime index and a single column of streamflow values
observed: A dataframe with a datetime index and a single column of streamflow values
merged_sim_obs: (optional) if you have already computed it, hydrostats.data.merge_data(simulated, observed)
merged_cor_obs: (optional) if you have already computed it, hydrostats.data.merge_data(corrected, observed)
metrics: A list of abbreviated statistic names. See the documentation for HydroErr
"""
if corrected is False and simulated is False and observed is False:
if merged_sim_obs is not False and merged_cor_obs is not False:
pass # if you provided the merged dataframes already, we use those
else:
# merge the datasets together
merged_sim_obs = hd.merge_data(sim_df=simulated, obs_df=observed)
merged_cor_obs = hd.merge_data(sim_df=corrected, obs_df=observed)
if metrics is None:
metrics = ['ME', 'RMSE', 'NRMSE (Mean)', 'MAPE', 'NSE', 'KGE (2009)', 'KGE (2012)']
# Merge Data
table1 = hs.make_table(merged_dataframe=merged_sim_obs, metrics=metrics)
table2 = hs.make_table(merged_dataframe=merged_cor_obs, metrics=metrics)
table2 = table2.rename(index={'Full Time Series': 'Corrected Full Time Series'})
table1 = table1.rename(index={'Full Time Series': 'Original Full Time Series'})
table1 = table1.transpose()
table2 = table2.transpose()
table_final = pd.merge(table1, table2, right_index=True, left_index=True)
return table_final.to_html()
def _flow_and_probability_mapper(monthly_data: pd.DataFrame, to_probability: bool = False,
to_flow: bool = False, extrapolate: bool = False) -> interpolate.interp1d:
if not to_flow and not to_probability:
raise ValueError('You need to specify either to_probability or to_flow as True')
# get maximum value to bound histogram
max_val = math.ceil(np.max(monthly_data.max()))
min_val = math.floor(np.min(monthly_data.min()))
if max_val == min_val:
warnings.warn('The observational data has the same max and min value. You may get unanticipated results.')
max_val += .1
# determine number of histograms bins needed
number_of_points = len(monthly_data.values)
number_of_classes = math.ceil(1 + (3.322 * math.log10(number_of_points)))
# specify the bin width for histogram (in m3/s)
step_width = (max_val - min_val) / number_of_classes
# specify histogram bins
bins = np.arange(-np.min(step_width), max_val + 2 * np.min(step_width), np.min(step_width))
if bins[0] == 0:
bins = np.concatenate((-bins[1], bins))
elif bins[0] > 0:
bins = np.concatenate((-bins[0], bins))
# make the histogram
counts, bin_edges = np.histogram(monthly_data, bins=bins)
# adjust the bins to be the center
bin_edges = bin_edges[1:]
# normalize the histograms
counts = counts.astype(float) / monthly_data.size
# calculate the cdfs
cdf = np.cumsum(counts)
# interpolated function to convert simulated streamflow to prob
if to_probability:
if extrapolate:
func = interpolate.interp1d(bin_edges, cdf, fill_value='extrapolate')
else:
func = interpolate.interp1d(bin_edges, cdf)
return lambda x: np.clip(func(x), 0, 1)
# interpolated function to convert simulated prob to observed streamflow
elif to_flow:
if extrapolate:
return interpolate.interp1d(cdf, bin_edges, fill_value='extrapolate')
return interpolate.interp1d(cdf, bin_edges)