from typing import Optional, Tuple
from scipy.stats import circmean
import numpy as np
import pandas as pd
from agroecometrics import settings
import agroecometrics as AEM
# Gets the acutal LABELS of columns based on the user settings
LABELS = settings.get_labels()
DAY_SECONDS = 24*60*60
YEAR_SECONDS = DAY_SECONDS*365
# Helper Functions
[docs]
def compute_esat(temp: np.ndarray) -> np.ndarray:
"""
Computes saturation vapor pressure based Tetens formula
Args:
temp: Temperature to calculate the saturation vapor pressure (°C)
Returns:
Computed saturation vapor pressure (kPa)
"""
e_sat = 0.6108 * np.exp(17.27 * temp/(temp+237.3))
return e_sat
[docs]
def compute_Ra(doy: np.ndarray, latitude: float) -> np.ndarray:
"""
Computes extra-terrestrial solar radiation using the FAO Penman-Monteith method
Args:
doy: Day of year (0-365) where January 1st is 0 and 365
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
Returns:
Extra-terrestrial solar radiation (Ra) in MJ/m²/day
"""
dr = 1 + 0.033 * np.cos(2 * np.pi * doy/365) # Inverse relative distance Earth-Sun
phi = np.pi / 180 * latitude # Latitude in radians
d = 0.409 * np.sin((2 * np.pi * doy/365) - 1.39) # Solar delcination
omega = np.arccos(-np.tan(phi) * np.tan(d)) # Sunset hour angle
Gsc = 0.0820 # Solar constant
Ra = 24 * 60 / np.pi * Gsc * dr
Ra = Ra * (omega * np.sin(phi) * np.sin(d) + np.cos(phi) * np.cos(d) * np.sin(omega))
return Ra
# Air Temperature model
[docs]
def model_air_temp(df: pd.DataFrame) -> np.ndarray:
'''
Creates an Air temperature model and creates air temperature estimates using model
Args:
df: DataFrame containing temperature data
Returns:
A numpy array of predicted daily temperatures over the period of the dataframe
'''
global LABELS
# Check Parameters
if LABELS['temp_avg'] not in df.columns:
raise ValueError(f"{LABELS['temp_avg']} was not found in the df. Please check your temp_avg label.")
if LABELS['date_time'] not in df.columns:
raise ValueError(f"{LABELS['date_time']} was not found in the df. Please check your date_time label.")
# Extract air temperatures
air_temp = df[LABELS['temp_avg']]
# Estimate sinusoidal parameters
avg_temp = air_temp.mean() # Units: °C
thermal_amp = (max(air_temp) - min(air_temp)) / 2 # Units: °C
min_date = df.loc[air_temp.idxmin(), LABELS['date_time']] # Datetime object
min_doy = (min_date - pd.Timestamp(min_date.year, 1, 1)).days # Units: Days
# Generate daily temperature predictions using the model
T_pred = avg_temp + thermal_amp * np.cos(2 * np.pi * ((df[LABELS['doy']] - min_doy) / 365) + np.pi)
return np.asarray(T_pred)
# Soil Temperature models
[docs]
def soil_temp_at_depth(
depth: int,
avg_temp: int=25,
thermal_amp: int = 10,
thermal_dif: float = 0.203,
time_lag: int = 15
) -> np.ndarray:
"""
Models soil temperature over one year at the given depth
Args:
depth: The depth to model the soil temperature (m)
avg_temp: The annual average surface temperature (°C)
thermal_amp: The annual thermal amplitude of the soil surface (°C)
thermal_dif: The Thermal diffusivity obtained from KD2 Pro instrument (mm^2 / s)
time_lag: The time lag between Jaunary 1st and the coldest day of the year (days)
Return:
Predicted soil temperature at the given depth for each day of the year (°C)
"""
# Set Constants
OMEGA = 2 * np.pi / YEAR_SECONDS # Phase Frequency | Units: 1 / s
PHASE = np.pi/2 + OMEGA * (time_lag * DAY_SECONDS) # Phase constant | Units:
# Calculate Damping Depth
thermal_dif = thermal_dif / 100 # Unit Conversion: cm^2 / s
damp_depth = (2*thermal_dif/OMEGA)**(1/2) # Units: cm
# Estimate the temperatures for each day of the year
doy = np.arange(0,365)*DAY_SECONDS # Units: s
soil_temp = avg_temp + \
thermal_amp * np.exp(-depth / damp_depth) * \
np.sin(OMEGA * doy - depth / damp_depth - PHASE)
return np.asarray(soil_temp)
[docs]
def soil_temp_on_day(
doy: int,
max_depth: int,
Nz: int = 100,
avg_temp: int = 25,
thermal_amp: int = 10,
thermal_dif: int = 0.203,
timelag: int = 15
) -> Tuple[np.ndarray, np.ndarray]:
"""
Models soil temperature on a particular day of the year
Args:
doy: Day of year (0-365) where January 1st is 0 and 365
max_depth: The maximum depth in centimeters to model the soil temperature
Nz: The number of interpolated depths to caluculate soil temperature at
avg_temp: The annual average surface temperature (°C)
thermal_amp: The annual thermal amplitude of the soil surface (°C)
thermal_dif: The Thermal diffusivity obtained from KD2 Pro instrument [mm^2/s]
time_lag: The time lag in days (0-365) where January 1st is 0 and 365
Returns:
An np array of the soil temps and an np array of the depths of the soil temps (°C)
"""
# Set Constants
OMEGA = 2 * np.pi / (YEAR_SECONDS) # Phase Frequency | Units: 1 / s
PHASE = np.pi / 2 + OMEGA * (timelag * DAY_SECONDS) # Phase constant
# Calculate Damping Depth
thermal_dif = thermal_dif / 100 # Unit Conversion: cm^2 / s
damp_depth = (2 * thermal_dif / OMEGA) ** 0.5 # Units: cm
# Estimate the temperatures for each depth
depths = np.linspace(0, max_depth, Nz) # Interpolation depths | Units: cm
T_soil = avg_temp + \
thermal_amp * np.exp(-depths / damp_depth) * \
np.sin(OMEGA * doy - depths / damp_depth - PHASE)
return (np.asarray(depths), np.asarray(T_soil))
[docs]
def soil_temp_at_depth_on_day(
max_depth: int,
Nz: int = 1000,
avg_temp: int = 25,
thermal_amp: int = 10,
thermal_dif: int = 0.203,
timelag: int = 15
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Models soil temperature over a full year (0–365) and across depth.
Args:
max_depth: The maximum depth in centimeters to model the soil temperature
Nz: The number of interpolated depths to calculate soil temperature at
avg_temp: The annual average surface temperature (°C)
thermal_amp: The annual thermal amplitude of the soil surface (°C)
thermal_dif: The Thermal diffusivity obtained from KD2 Pro instrument [mm^2/s]
timelag: The time lag in days (0-365) where January 1st is 0 and 365
Returns:
A tuple of (doy_grid, z_grid, temp_grid), where each is a 2D NumPy array.
"""
# Set Constants
OMEGA = 2*np.pi/365
thermal_dif = thermal_dif / 100 * 86400 # convert to cm^2/day
phase_const = np.pi/2 + OMEGA*timelag # Phase constant
damp_depth = (2*thermal_dif/OMEGA)**(1/2) # Damping depth
doy = np.arange(1,366)
depths = np.linspace(0, max_depth, Nz) # Interpolation depths
doy_grid,z_grid = np.meshgrid(doy,depths)
t_grid = avg_temp + thermal_amp * np.exp(-z_grid/damp_depth) * np.sin(OMEGA*doy_grid - z_grid/damp_depth - phase_const)
return doy_grid, z_grid, t_grid
[docs]
def model_soil_temp_from_air_temp(
df: pd.DataFrame,
depth: float,
date: str,
date_format: str = LABELS['date_format'],
thermal_dif: int = 0.203,
) -> np.ndarray:
"""
Creates temperature predictions for every 5 minutes for a modeled date
Estimates the parameters for a sinusodial function modeling the daily surface temperature.
Uses the parameter estimations to create a model of soil temperatures at different depths
Args:
df: DataFrame containing temperature data, must contain 5_minute_temp as defined in settings.
depth: The depth in centimeters to model the soil temperature
date: Is the date which parameters will be estimated for.
date_format: Is the format that the date variable and data use for the date.
thermal_dif: The Thermal diffusivity obtained from KD2 Pro instrument [mm^2/s]
Returns:
A numpy array containing the predicted temperatures (°C) every 5 minutes
starting at midnight at the specified depth and a numpy array containing
the given air temperatures
"""
# Constants
global LABELS
OMEGA = 2 * np.pi / DAY_SECONDS # Phase Frequency | Units: 1 / s
# Calculate Damping Depth
thermal_dif = thermal_dif / 100 # Unit Conversion: cm^2 / s
damp_depth = (2 * thermal_dif / OMEGA) ** 0.5 # Units: cm
# Find all the entries in the dataframe with the correct date and sort by time
target_date = pd.to_datetime(date, format=date_format).normalize()
dates = pd.to_datetime(df[LABELS['date_norm']], format=date_format)
daily_df = df[dates == target_date].sort_values(LABELS['date_time'])
# Raise an exception if no data for the specified date could be found
if len(daily_df) == 0:
raise ValueError(f"Could not find data in the given dataframe for:\t{date}")
# Extract the 5-minute air temperatures and ensure all exist
air_temp = daily_df[LABELS['5_minute_temp']]
if len(air_temp) != DAY_SECONDS / (5*60):
raise ValueError(f"Expected 288 5-minute temperature values for {target_date.date()}, got {len(air_temp)}")
# Estimate sinusoidal parameters
avg_temp = air_temp.mean() # Units: °C
thermal_amp = (max(air_temp) - min(air_temp)) / 2 # Units: °C
min_time = daily_df.loc[air_temp.idxmin(), LABELS['date_time']]
timelag = (min_time.hour * 60 + min_time.minute) * 60 # Units: s
# Generate predictions for every 5 minutes
time = np.arange(0, DAY_SECONDS, 5 * 60) # Units: s
temp_predictions = avg_temp + \
thermal_amp * np.exp(-depth / damp_depth) * \
np.sin(OMEGA * (time - timelag) - (depth / damp_depth))
# DEBUGGING
soil_temp_actual = daily_df[LABELS['soil_temp4']]
# DEBUGGING
return temp_predictions, air_temp.to_numpy(), soil_temp_actual
[docs]
def model_soil_temp_at_depth_from_air_temp(
df: pd.DataFrame,
max_depth: float,
date: str,
Nz: int = 1000,
date_format: str = '%m/%d/%Y %I:%M %p',
thermal_dif: float = 0.203
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Models soil temperature over a full day (in 5-minute intervals) across depth
using air temperature data to parameterize a sinusoidal model.
Args:
df: DataFrame containing temperature data; must contain '5_minute_temp' and datetime.
max_depth: Maximum depth in centimeters to model the soil temperature.
date: The date (string) for which parameters will be estimated.
Nz: Number of interpolated depth points.
date_format: The format used for the date string.
thermal_dif: Thermal diffusivity [mm^2/s] from KD2 Pro instrument.
Returns:
A tuple of (time_grid, depth_grid, temp_grid), each a 2D NumPy array.
"""
# Constants
global LABELS
OMEGA = 2 * np.pi / DAY_SECONDS # Phase Frequency | Units: 1 / s
# Calculate Damping Depth
thermal_dif = thermal_dif / 100 # Unit Conversion: cm^2 / s
damp_depth = (2 * thermal_dif / OMEGA) ** 0.5 # Units: cm
# Find all the entries in the dataframe with the correct date and sort by time
target_date = pd.to_datetime(date, format=date_format).normalize()
dates = pd.to_datetime(df[LABELS['date_norm']], format=date_format)
daily_df = df[dates == target_date].sort_values(LABELS['date_time'])
# Raise an exception if no data for the specified date could be found
if len(daily_df) == 0:
raise ValueError(f"Could not find data in the given dataframe for:\t{date}")
# Extract the 5-minute air temperatures and ensure all exist
air_temp = daily_df[LABELS['5_minute_temp']]
if len(air_temp) != DAY_SECONDS / (5*60):
raise ValueError(f"Expected 288 5-minute temperature values for {target_date.date()}, got {len(air_temp)}")
# Estimate sinusoidal parameters
avg_temp = air_temp.mean() # Units: °C
thermal_amp = (max(air_temp) - min(air_temp)) / 2 # Units: °C
min_time = daily_df.loc[air_temp.idxmin(), LABELS['date_time']]
timelag = (min_time.hour * 60 + min_time.minute) * 60 # Units: s
# Create time and depth grids
time = np.arange(0, DAY_SECONDS, 5)
depths = np.linspace(max_depth/Nz, max_depth, Nz)
time_grid, depth_grid = np.meshgrid(time, depths)
# Generate predictions for the given times and depths
temp_grid = avg_temp + \
thermal_amp * np.exp(-depth_grid / damp_depth) * \
np.sin(OMEGA * (time_grid - timelag) - depth_grid / damp_depth)
return time_grid, depth_grid, temp_grid
# EvapoTranspiration Models
[docs]
def dalton(
temp_min: np.ndarray,
temp_max: np.ndarray,
RH_min: np.ndarray,
RH_max: np.ndarray,
wind_speed: np.ndarray
) -> np.ndarray:
"""
Computes Evapotranspiration using the Dalton model
Args:
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
RH_min: The minimum daily relative humidity (range: 0.0-1.0)
RH_max: The maximum daily relative humidity (range: 0.0-1.0)
wind_speed: The average daily wind speed in meters per second
Returns:
The Dalton models predictions for evapotranspiration clipped to a minimum of zero
"""
# Ensures parameter saftey
if not isinstance(temp_min, float) and not \
(len(temp_min) == len(temp_max) == len(RH_min) == len(RH_max) == len(wind_speed)):
raise ValueError("All inputs must be the same length")
# Model calulations
e_sat_min = compute_esat(temp_min)
e_sat_max = compute_esat(temp_max)
e_sat = (e_sat_min + e_sat_max)/2
e_atm = (e_sat_min*(RH_max/100) + e_sat_max*(RH_min/100))/ 2
PET = (3.648 + 0.7223*wind_speed)*(e_sat - e_atm)
# Ensures non-negative evapotranspiration
PET = np.clip(PET, 0.0, None)
return PET
[docs]
def penman(
temp_min: np.ndarray,
temp_max: np.ndarray,
RH_min: np.ndarray,
RH_max: np.ndarray,
wind_speed: np.ndarray
) -> np.ndarray:
"""
Computes evapotranspiration using the Penman model
Args:
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
RH_min: The minimum daily relative humidity (range: 0.0-1.0)
RH_max: The maximum daily relative humidity (range: 0.0-1.0)
wind_speed: The average daily wind speed in meters per second
Returns:
The Penman models predictions for evapotranspiration clipped to a minimum of zero
"""
# Ensure parameter saftey
if not isinstance(temp_min, float) and not \
(len(temp_min) == len(temp_max) == len(RH_min) == len(RH_max) == len(wind_speed)):
raise ValueError("All inputs must be the same length")
# Model calulations
e_sat_min = compute_esat(temp_min)
e_sat_max = compute_esat(temp_max)
e_sat = (e_sat_min + e_sat_max)/2
e_atm = (e_sat_min*(RH_max/100) + e_sat_max*(RH_min/100))/ 2
PET = (2.625 + 0.000479/wind_speed)*(e_sat - e_atm)
# Ensures non-negative evapotranspiration
PET = np.clip(PET, 0.0, None)
return PET
[docs]
def romanenko(
temp_min: np.ndarray,
temp_max: np.ndarray,
RH_min: np.ndarray,
RH_max: np.ndarray,
) -> np.ndarray:
"""
Potential evaporation model proposed by Romanenko in 1961
Args:
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
RH_min: The minimum daily relative humidity (range: 0.0-1.0)
RH_max: The maximum daily relative humidity (range: 0.0-1.0)
Returns:
The Romanenko models predictions for evapotranspiration clipped to a minimum of zero
"""
# Ensure parameter saftey
if not isinstance(temp_min, float) and not \
(len(temp_min) == len(temp_max) == len(RH_min) == len(RH_max)):
raise ValueError("All inputs must be the same length")
# Model calulations
temp_avg = (temp_min + temp_max)/2
RH_avg = (RH_min + RH_max)/2
PET = 0.00006*(25 + temp_avg)**2*(100 - RH_avg)
# Ensures non-negative evapotranspiration
PET = np.clip(PET, 0.0, None)
return PET
[docs]
def jensen_haise(
temp_min: np.ndarray,
temp_max: np.ndarray,
doy: np.ndarray,
latitude: np.ndarray
) -> np.ndarray:
"""
Computes evapotranspiration using the Jensen model
Args:
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
doy: Day of year (0-365) where January 1st is 0 and 365
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
Returns:
The Jensen-Haise models predictions for evapotranspiration clipped to a minimum of zero
"""
# Ensure parameter saftey
if not isinstance(temp_min, float) and not (len(temp_min) == len(temp_max)):
raise ValueError("All inputs must be the same length")
# Model Calculations
Ra = compute_Ra(doy, latitude)
T_avg = (temp_min + temp_max)/2
PET = 0.0102 * (T_avg+3) * Ra
# Ensures non-negative evapotranspiration
PET = np.clip(PET, 0.0, None)
return PET
[docs]
def hargreaves(
temp_min: np.ndarray,
temp_max: np.ndarray,
doy: np.ndarray,
latitude: float
) -> np.ndarray:
"""
Computes evapotranspiration using the Hargreaves model
Args:
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
doy: Day of year (0-365) where January 1st is 0 and 365
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
Returns:
The Hargreaves models predictions for evapotranspiration clipped to a minimum of zero
"""
# Ensure parameter saftey
if not isinstance(temp_min, float) and not (len(temp_min) == len(temp_max)):
raise ValueError("All inputs must be the same length")
# Model Calulations
Ra = compute_Ra(doy, latitude)
T_avg = (temp_min + temp_max)/2
PET = 0.0023 * Ra * (T_avg + 17.8) * (temp_max - temp_min)**0.5
# Ensures non-negative evapotranspiration
PET = np.clip(PET, 0.0, None)
return PET
[docs]
def penman_monteith(
temp_min: np.ndarray,
temp_max: np.ndarray,
RH_min: np.ndarray,
RH_max: np.ndarray,
p_min: np.ndarray,
p_max: np.ndarray,
wind_speed: np.ndarray,
doy: np.ndarray,
latitude: float,
altitude: float,
wind_height: int = 1.5,
) -> np.ndarray:
"""
PComputer evapotranspiration using the penman-monteith model
Args:
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
RH_min: The minimum daily relative humidity (range: 0.0-1.0)
RH_max: The maximum daily relative humidity (range: 0.0-1.0)
p_min: The minimum daily atmospheric pressure in Pa
p_max: The maximum daily atmospheric pressure in Pa
wind_speed: The average daily wind speed in meters per second
doy: Day of year (0-365) where January 1st is 0 and 365
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
altitude: Altitude of the location in meters
wind_height: Height of measurment for wind speed
Returns:
The Hargreaves models predictions for evapotranspiration clipped to
a minimum of zero
"""
# Ensure parameter saftey
if not isinstance(temp_min, float) and not (len(temp_min) == len(temp_max) == len(RH_min)\
== len(RH_max) == len(wind_speed) == len(wind_speed)):
raise ValueError("All inputs must be the same length")
temp_avg = (temp_min + temp_max)/2
atm_pressure = (p_min+p_max)/2 # Can be also obtained from weather station
Cp = 0.001013; # Approx. 0.001013 for average atmospheric conditions
gamma = (Cp * atm_pressure) / (0.622 * 2.45) # Approx. 0.000665
# Wind speed Adjustment
wind_speed_2m = wind_speed * (4.87 / np.log((67.8 * wind_height) - 5.42)) # Eq. 47, FAO-56 wind height in [m]
# Calculates air humidity and vapor pressure
delta = 4098 * (0.6108 * np.exp(17.27 * temp_avg / (temp_avg + 237.3)))
delta = delta / (temp_avg + 237.3)**2
e_temp_max = 0.6108 * np.exp(17.27 * temp_max / (temp_max + 237.3)) # Eq. 11, //FAO-56
e_temp_min = 0.6108 * np.exp(17.27 * temp_min / (temp_min + 237.3))
e_saturation = (e_temp_max + e_temp_min) / 2
e_actual = (e_temp_min * (RH_max / 100) + e_temp_max * (RH_min / 100)) / 2
# Calculates solar radiation
solar_rad = compute_Ra(doy, latitude)
# Clear Sky Radiation: Rso (MJ/m2/day)
Rso = (0.75 + (2 * 10**-5) * altitude) * solar_rad # Eq. 37, FAO-56
# Rs/Rso = relative shortwave radiation (limited to <= 1.0)
alpha = 0.23 # 0.23 for hypothetical grass reference crop
Rns = (1 - alpha) * solar_rad # Eq. 38, FAO-56
sigma = 4.903 * 10**-9
maxTempK = temp_max + 273.16
minTempK = temp_min + 273.16
# Eq. 39, FAO-56
Rnl = sigma * (maxTempK**4 + minTempK**4)
Rnl = Rnl / 2 * (0.34 - 0.14 * np.sqrt(e_actual))
Rnl = Rnl * (1.35 * (solar_rad / Rso) - 0.35)
Rn = Rns - Rnl # Eq. 40, FAO-56
# Soil heat flux density
soil_heat_flux = 0 # Eq. 42, FAO-56 G = 0 for daily time steps [MJ/m2/day]
# ETo calculation
PET = 0.408 * delta * (solar_rad - soil_heat_flux) + gamma
PET = PET * (900 / (temp_avg + 273)) * wind_speed_2m * (e_saturation - e_actual)
PET = PET / (delta + gamma * (1 + 0.34 * wind_speed_2m))
# Ensures non-negative evapotranspiration
PET = np.clip(PET, 0.0, None)
return np.round(PET,2)
[docs]
def EvapoTranspiration_to_df(
df: pd.DataFrame,
model_name: str,
temp_min: Optional[np.ndarray] = None,
temp_max: Optional[np.ndarray] = None,
RH_min: Optional[np.ndarray] = None,
RH_max: Optional[np.ndarray] = None,
wind_speed: Optional[np.ndarray] = None,
p_min: Optional[np.ndarray] = None,
p_max: Optional[np.ndarray] = None,
doy: Optional[np.ndarray] = None,
latitude: Optional[float] = None,
altitude: Optional[float] = None,
wind_height: int = 1.5
) -> pd.DataFrame:
"""
Adds daily evapotranspiration and its cumulative sum to the given DataFrame
using a specified model and its required parameters.
Args:
df: The DataFrame to add the evapotranspiration data to
model_name: Name of the model to use. Options:
["dalton", "penman", "romanenko", "jensen_haise", "hargreaves", "penman_monteith"]
temp_min: The minimum daily temperature (°C)
temp_max: The maximum daily temperature (°C)
RH_min: The minimum daily relative humidity (range: 0.0-1.0)
RH_max: The maximum daily relative humidity (range: 0.0-1.0)
wind_speed: The average daily wind speed in meters per second
p_min: The minimum daily atmospheric pressure in Pa
p_max: The maximum daily atmospheric pressure in Pa
doy: Day of year (0-365) where January 1st is 0 and 365
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
altitude: Altitude of the location in meters
wind_height: Height of measurement for wind speed in meters
"""
model_name = model_name.lower()
if model_name == "dalton":
pet = dalton(temp_min, temp_max, RH_min, RH_max, wind_speed)
elif model_name == "penman":
pet = penman(temp_min, temp_max, RH_min, RH_max, wind_speed)
elif model_name == "romanenko":
pet = romanenko(temp_min, temp_max, RH_min, RH_max)
elif model_name == "jensen_haise":
pet = jensen_haise(temp_min, temp_max, doy, latitude)
elif model_name == "hargreaves":
pet = hargreaves(temp_min, temp_max, doy, latitude)
elif model_name == "penman_monteith":
pet = penman_monteith(temp_min, temp_max, RH_min, RH_max, p_min, p_max,
wind_speed, doy, latitude, altitude, wind_height)
else:
raise ValueError(f"Unknown model name '{model_name}'. Must be one of: "
"['dalton', 'penman', 'romanenko', 'jensen_haise', 'hargreaves', 'penman_monteith'].")
df[LABELS['evapotranspiration'] + "-" + model_name] = pet
# Rain/Runoff Models
[docs]
def model_runoff(precip: np.ndarray, cn: int = 75) -> pd.DataFrame:
'''
Uses Curve Number to estimate runoff from rainfall
Args:
precip: The daily amount of precipitation in millimeters
cn: The curve number to use in the calculation
Returns:
The estimated runoff
'''
# Convert precitation to inches
precip_inch = precip / 25.4
# Model Calulations
runoff = np.zeros_like(precip_inch)
S02 = 1000 / cn - 10
S005 = 1.33 * S02**1.15
Lambda = 0.05
Ia = S005 * Lambda
idx = precip_inch > Ia
runoff[idx] = (precip_inch[idx] - Ia)**2 / (precip_inch[idx] - Ia + S005)
return runoff * 25.4 # Convert runoff back to millimeters
[docs]
def rainfall_runoff_to_df(df: pd.DataFrame, cn: int = 75):
"""
Adds rainfall sum, runoff sum, and runoff to the DataFrame
Args:
df: The DataFrame containing the rainfall data
cn: The curve number to be used in the runoff calculation
"""
global LABELS
# Check Parameters
if LABELS['rain'] not in df.columns:
raise ValueError(f"{LABELS['rain']} was not found in the df. Please check your rain label.")
# Add data to the df
df[LABELS["rain_sum"]] = df[LABELS["rain"]].cumsum()
df[LABELS["runoff"]] = model_runoff(df[LABELS["rain"]], cn=cn)
df[LABELS["runoff_sum"]] = df[LABELS["runoff"]].cumsum()
# Fletcher's Functions
[docs]
def match_weather_datetime(
weather_dt: np.ndarray,
data_dt: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Matches each datetime in `data_datetime_col` to the closest datetime
in `weather_datetime_col`.
Args:
weather_dt: The np array of actual weather date_times
data_dt: The np array of date_times to find the closest match for
Returns:
A tuple of (original times, matched weather times, matched indices, time differences)
"""
indices = np.searchsorted(weather_dt, data_dt, side="left")
indices = np.minimum(indices, len(weather_dt) - 1)
for i in range(len(indices)):
if indices[i] > 0:
before = weather_dt[indices[i] - 1]
after = weather_dt[indices[i]]
if abs(data_dt[i] - before) < abs(data_dt[i] - after):
indices[i] -= 1
matched_times = weather_dt[indices]
diffs = np.abs(data_dt - matched_times)
return data_dt, matched_times, indices, diffs
[docs]
def get_weather_data_from_cols(
weather_df: pd.DataFrame,
weather_cols: list[str],
indices: np.ndarray
) -> dict[str, list[float]]:
"""
Extracts values from weather columns at given matched indices.
Args:
weather_df: The DataFrame containing weather data
weather_cols: The key names to be used in the dictionary
indices: The indices in the DataFrame to be used in the dictionary
Returns:
A dictionary of {column_name: values}.
"""
return {
col: [weather_df[col].iloc[i] for i in indices]
for col in weather_cols
}
# Growing Degree Days
[docs]
def model_gdd(
temp_avg: np.ndarray,
temp_base: float,
temp_opt: Optional[float] = None,
temp_upper: Optional[float] = None,
duration_time: float = 1.0
) -> np.ndarray:
"""
Model how many growing degree days have passed
Models Growing Degree days using a minimum base temperature, an optional
optimal temperature, an optional maximum growing temperature, and
the average temperature over the recorded durations
Args:
temp_avg: The average temperature over the duration_time (°C)
temp_base: The minimum temperature a given crop will grow at (°C)
temp_opt: The optimal temperature a given crop will grow (°C),
above this temperature growing will slow linearly
temp_upper: The maximum temperature a given crop will grow (°C)
duration_time: The number of days that each temp_avg represents
Returns:
An np.ndarray with the calculated Growing Degree Days for each daily temperature
"""
# Validate parameter input
if(duration_time <= 0):
raise ValueError("The time duration must be positive")
if temp_avg is None or (not np.isscalar(temp_avg) and len(temp_avg) == 0):
raise ValueError("You must provide temperature averages, None provided")
if temp_opt is None and temp_upper is not None or\
temp_upper is None and temp_opt is not None:
raise ValueError("You must provide both upper and optimal temperatures or neither")
# Ensure the temp_avg is a numpy array
temp_avg = np.asarray(temp_avg)
# Simply GDD calculation
if temp_opt is None and temp_upper is None:
gdd_days = (temp_avg - temp_base)*duration_time
return np.maximum(0, gdd_days)
# Initialize GDD array
gdd = np.zeros_like(temp_avg)
# Vectorized masks
below_opt = temp_avg <= temp_opt
above_opt = temp_avg > temp_opt
# Compute GDD where temp < temp_opt
gdd[below_opt] = np.maximum( 0, (temp_avg[below_opt] - temp_base) * duration_time)
# Compute GDD where temp >= temp_opt
gdd_max = (temp_opt - temp_base) * duration_time
gdd[above_opt] = np.maximum(0,
gdd_max * (temp_upper - temp_avg[above_opt]) / (temp_upper - temp_opt),)
return gdd
[docs]
def gdd_to_df(
df: pd.DataFrame,
temp_avg: np.ndarray,
temp_base: float,
temp_opt: Optional[float] = None,
temp_upper: Optional[float] = None,
duration_time: float = 1
):
"""
Models Growing Degree days using a minimum base temperature, an optional
optimal temperature, an optional maximum growing temperature, and
the average temperature over the recorded durations.
Adds the Growing Degree days and cummulative GDD to the given DataFrame
Args:
df: The DataFrame to add the GDD data to
temp_avg: The average temperature over the duration_time (°C)
temp_base: The minimum temperature a given crop will grow at (°C)
temp_optimal: The optimal temperature a given crop will grow (°C),
above this temperature growing will slow linearly
temp_upper: The maximum temperature a given crop will grow (°C)
duration_time: The number of days that each temp_avg represents
"""
global LABELS
# Calculate gdd
gdd = model_gdd(temp_avg, temp_base, temp_opt, temp_upper, duration_time)
df[LABELS['gdd']] = gdd
df[LABELS['gdd_sum']] = gdd.cumsum()
# Photo Period Tools
[docs]
def photoperiod_at_latitude(latitude: float, doy: np.ndarray):
"""
Computes photoperiod for a given latitude and day of year. Not accurate near polar regions.
Args:
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
doy: np.ndarray of the days of year (0-365) where January 1st is 0 and 365
Returns:
Photoperiod, daylight hours, at the given latitude for the given days.
The angle of the sum below the horizon.
The zenithal distance of the sun in degrees
The mean anomaly of the sun
The declination of the sun in degrees
Lambda
Delta
"""
# Convert latitude to radians
latitude = np.radians(latitude)
# Angle of the sun below the horizon. Civil twilight is -4.76 degrees.
light_intensity = 2.206 * 10**-3
B = -4.76 - 1.03 * np.log(light_intensity) # Eq. [5].
# Zenithal distance of the sun in degrees
alpha = np.radians(90 + B) # Eq. [6]. Value at sunrise and sunset.
# Mean anomaly of the sun. It is a convenient uniform measure of
# how far around its orbit a body has progressed since pericenter.
M = 0.9856*doy - 3.251 # Eq. [4].
# Declination of sun in degrees
Lambda = M + 1.916*np.sin(np.radians(M)) + 0.020*np.sin(np.radians(2*M)) + 282.565 # Eq. [3]. Lambda
C = np.sin(np.radians(23.44)) # 23.44 degrees is the orbital plane of Earth around the Sun
delta = np.arcsin(C*np.sin(np.radians(Lambda))) # Eq. [2].
# Calculate daylength in hours, defining sec(x) = 1/cos(x)
P = 2/15 * np.degrees( np.arccos( np.cos(alpha) * (1/np.cos(latitude)) * (1/np.cos(delta)) - np.tan(latitude) * np.tan(delta) ) ) # Eq. [1].
return P, B, alpha, M, Lambda, np.degrees(delta)
[docs]
def photoperiod_on_day(latitude: np.ndarray, doys: np.ndarray):
"""
Computes photoperiod for a given near polar regions.
Args:
latitude: Latitude in decimal degress. Where the northern hemisphere is
positive and the southern hemisphere is negative
doys: The day of year (0-365) where January 1st is 0 and 365 to perform the calculation
Returns:
Photoperiod, daylight hours, for the given latitudes on the given day.
The angle of the sum below the horizon.
The zenithal distance of the sun in degrees
The mean anomaly of the sun
The declination of the sun in degrees
Lambda from the equation
Delta from the equation
References:
Keisling, T.C., 1982. Calculation of the Length of Day 1. Agronomy Journal, 74(4), pp.758-759.
"""
# Convert latitude to radians and convert shapes
latitude = np.radians(np.asarray(latitude)).reshape(-1, 1) # shape (N, 1)
doys = np.asarray(doys).reshape(1, -1)
# Angle of the sun below the horizon. Civil twilight is -4.76 degrees.
light_intensity = 2.206 * 10**-3
B = -4.76 - 1.03 * np.log(light_intensity) # Eq. [5].
# Zenithal distance of the sun in degrees
alpha = np.radians(90 + B) # Eq. [6]. Value at sunrise and sunset.
# Mean anomaly of the sun. It is a convenient uniform measure of
# how far around its orbit a body has progressed since pericenter.
M = 0.9856*doys - 3.251 # Eq. [4].
# Declination of sun in degrees
Lambda = M + 1.916*np.sin(np.radians(M)) + 0.020*np.sin(np.radians(2*M)) + 282.565 # Eq. [3]. Lambda
C = np.sin(np.radians(23.44)) # 23.44 degrees is the orbital plane of Earth around the Sun
delta = np.arcsin(C*np.sin(np.radians(Lambda))) # Eq. [2].
# Calculate daylength in hours, defining sec(x) = 1/cos(x)
P = 2/15 * np.degrees( np.arccos( np.cos(alpha) * (1/np.cos(latitude)) * (1/np.cos(delta)) - np.tan(latitude) * np.tan(delta) ) ) # Eq. [1].
return P, B, alpha, M, Lambda, np.degrees(delta)
# Soil Water Flow Functions
[docs]
def hydraulic_conductivity(
k_s: float,
psi_e: float,
theta: float,
theta_s: float,
b: float,
):
"""
Estimates the hydraulic conductivity of soil
Args:
k_s: The saturated conductivity of the soil (kg * s / m^3)
psi_e: The air entry water potential of the soil (J / kg)
theta: Is the volumetric water content of the soil (m^3 / m^3)
theta_s: Is the saturation water content of the soil (m^3 / m^3)
b: Is the exponent of moisture release parameter
Returns:
The calculated hydraulic conductivity of the soil given the parameters ()
"""
psi_m = psi_e* (theta / theta_s) ** (-b)
k_psi_m = k_s * (psi_e / psi_m) ** (2 + 3/b)
return k_psi_m
[docs]
def cummulative_water_infultration(
delta_theta: float,
K_avg: float,
psi_mi: float,
psi_mf: float,
time: int,
Nz: int=100
):
"""
Calculates the cummulative vertical water infultration.
Calculates the cummulative vertical water infultration for the given soil parameters.
Calculations are made for equally spaced times from [time/Nz to time]
Args:
delta_theta: The Volume Fraction of water
K_avg: The average hydraulic conductivity of the wet soil (kg * s /m^3)
psi_mi: The infultration boudary's water potential (J / kg)
psi_mf: The wetting front's water potentail (J / kg)
time: The length of time to calcute the cummulative water infultration (s)
Nz: The number of interpolated times to calculate
Returns:
A numpy array of length Nz representing the cummulative vertical water
infultration over time. Where the first value represents the
infultraiton after zero time, the second value represents the
infultration after time/Nz time, and the last vaule represents the
infultration after time.
"""
# Define Constants
WATER_DENSITY = 1000 # Units: Kg / m^3
LITTLE_G = 9.80665 # Units: m / s^2
if delta_theta < 0 or delta_theta > 1.0:
raise ValueError(f"The volume Fraction of water must be between [0,1], but {delta_theta} was provided")
# Calculate water infultration at each of the time
times = np.linspace(0, time, Nz)
water_inful = 2 * WATER_DENSITY * delta_theta * K_avg * (psi_mi - psi_mf) * times
water_inful = water_inful ** 0.5 + LITTLE_G * K_avg * times
return water_inful