#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DOI: 10.5281/zenodo.15102100
# Full license can be found in License.md
#
# DISTRIBUTION STATEMENT A: Approved for public release. Distribution is
# unlimited.
# -----------------------------------------------------------------------------
"""Functions used for fitting functions to intensity data."""
import numpy as np
from scipy.optimize import leastsq
from scipy import stats
from pyIntensityFeatures.utils import checks
from pyIntensityFeatures.utils import distributions
[docs]
def estimate_peak_widths(data, data_loc, peak_inds, peak_mags):
"""Estimate the full width at half max of the peaks in a curve.
Parameters
----------
data : array-like
Input data (y-axis)
data_loc : array-like
Input data location (x-axis)
peak_inds : array-like
Indexes of the peaks
peak_mags : array-like
Magnitude of the peaks
Returns
-------
peak_sigmas : list-like
FWHM values for each peak, or the closest estimate possible
Notes
-----
Differs from Longden, et al. (2010) function by not assuming a fixed data
location increment of 1.0 degrees.
"""
# Ensure the input is array-like
data = np.asarray(data)
data_loc = np.asarray(data_loc)
peak_inds = np.asarray(peak_inds)
peak_mags = np.asarray(peak_mags)
# Get the half-max for each peak
half_max = 0.5 * peak_mags
peak_sigmas = list()
for i, ipeak in enumerate(peak_inds):
# Step through all values of the curve data (`data`) with lower indices
# than this peak until either the half-max threshold or another peak is
# reached
reached = False
good_down = False
j = ipeak
while not reached and j > 0:
if (data[j] >= half_max[i]) and (data[j - 1] <= half_max[i]):
reached = True
fwhm_loc = data_loc[j - 1] + (
data_loc[j] - data_loc[j - 1]) * (
half_max[i] - data[j - 1]) / (data[j] - data[j - 1])
width_down = data_loc[ipeak] - fwhm_loc
good_down = True
else:
if data[j] < data[j - 1]:
# Data is no longer decreasing. Since the correct width was
# not reached before finding the desired half-max value,
# set an approximate value
reached = True
width_down = data_loc[ipeak] - data_loc[j - 1]
j -= 1
if not good_down and not reached:
# Reached edge of data before defining the width
width_down = data_loc[ipeak] - data_loc[j]
# Step through all values of the curve data (`data`) with higher
# indices than this peak until either the half-max threshold or another
# peak is reached
reached = False
good_up = False
j = ipeak
while not reached and j < len(data) - 1:
if (data[j] >= half_max[i]) and (data[j + 1] <= half_max[i]):
reached = True
fwhm_loc = data_loc[j + 1] + (
data_loc[j] - data_loc[j + 1]) * (
half_max[i] - data[j + 1]) / (data[j] - data[j + 1])
width_up = fwhm_loc - data_loc[ipeak]
good_up = True
else:
if data[j] < data[j + 1]:
# Data is no longer decreasing. Since the correct width was
# not reached before finding the desired half-max value,
# set an approximate value
reached = True
width_up = data_loc[j + 1] - data_loc[ipeak]
j += 1
if not good_up and not reached:
# Reached edge of data before defining the width
width_up = data_loc[j] - data_loc[ipeak]
# Calculate the peak sigmas using logic based on whether or not the
# HWHM was found on each leg
if good_up and good_down:
peak_sigmas.append((width_up + width_down) / checks.fwhm_const)
elif good_up:
peak_sigmas.append((2.0 * width_up) / checks.fwhm_const)
elif good_down:
peak_sigmas.append((2.0 * width_down) / checks.fwhm_const)
else:
peak_sigmas.append(2.0 * min([width_up, width_down])
/ checks.fwhm_const)
return peak_sigmas
[docs]
def gauss_quad_err(params, xvals, yvals, weights):
"""Calculate the error of a quadriatic Gaussian fit.
Parameters
----------
params : set
Set of parameters used by `mult_gauss_quad`
xvals : float or array-like
Location values
yvals : float or array-like
Intensity values
weights : float or array-like
Weights for each data value
Returns
-------
err : float or array-like
Weighted error of the fit defined by `params`
See Also
--------
utils.distributions.mult_gauss_quad
"""
# Define the error function to fit
err = (distributions.mult_gauss_quad(xvals, params) - yvals) * weights
return err
[docs]
def get_fitting_params(mlat_bins, ilats, ilt, mean_intensity, num_gauss=3):
"""Find the parameters for a Gaussian fit with a quadratic background.
Parameters
----------
mlat_bins : np.array
Magnetic latitude of output bin centres
ilats : np.array
Indices for magnetic latitudes with valid intensity values
ilt : int
Index for the magnetic local time
mean_intensity : np.array
2D array of mean intensity values
num_gauss : int
Maximum number of Gaussians to fit (default=3)
Returns
-------
params : list
List of the parameters needed to form a mult-Gaussian fit with a
quadratic background.
ipeaks : list
List containing indexes of the normalized intensity peaks for the
2D `mean_intesity` array down-selected by [`ilats`, `ilt`].
"""
# Perform a first-order polynomial fit to the data to obtain estimates for
# the background levels in the intensity profile. The first term is the
# slope and the second term is the intercept.
bg = np.polyfit(mlat_bins[ilats], mean_intensity[ilats, ilt], 1)
# Normalize the intensity curve
norm_intensity = (mean_intensity[ilats, ilt]
- mean_intensity[ilats, ilt].min()) / (
mean_intensity[ilats, ilt].max()
- mean_intensity[ilats, ilt].min())
# Find the main peak and its characteristics
ipeaks = [norm_intensity.argmax()]
peak_sigmas = estimate_peak_widths(norm_intensity, mlat_bins[ilats], ipeaks,
[norm_intensity[ipeaks[0]]])
# Locate any additional peaks, up to a desired maximum
mlt_gauss = num_gauss
while len(ipeaks) < mlt_gauss:
# Get a Gaussian for the prior peak
prior_gauss = distributions.gauss(
mlat_bins[ilats], norm_intensity[ipeaks[-1]],
mlat_bins[ilats][ipeaks[-1]], peak_sigmas[-1], 0.0)
# Remove the gaussian fit from the data
norm_intensity -= prior_gauss
# Get the next peak
inew = norm_intensity.argmax()
# Test to see if the new peak is significant
if not np.any((mlat_bins[ilats][inew] > mlat_bins[ilats][ipeaks]
- np.asarray(peak_sigmas) * 2.0)
& (mlat_bins[ilats][inew] < mlat_bins[ilats][ipeaks]
+ np.asarray(peak_sigmas) * 2.0)):
ipeaks.append(inew)
peak_sigmas.extend(estimate_peak_widths(
norm_intensity, mlat_bins[ilats], [ipeaks[-1]],
[norm_intensity[ipeaks[-1]]]))
else:
mlt_gauss = len(ipeaks)
# Use the peak information to set the Gaussian fitting parameters
params = [bg[1], bg[0], 0.0]
for i, ipeak in enumerate(ipeaks):
params.extend([mean_intensity[ilats, ilt][ipeak],
mlat_bins[ilats][ipeak], peak_sigmas[i]])
return params, ipeaks
[docs]
def get_gaussian_func_fit(mlat_bins, mlt_bins, mean_intensity, std_intensity,
num_intensity, num_gauss=3, min_num=3,
min_intensity=0.0, min_lat_perc=70.0):
"""Fit intensity data using least-squares minimization in MLT bins.
Parameters
----------
mlat_bins : np.array
Magnetic latitude of output bin centres
mlt_bins : np.array
Magnetic local time of output bin centres
mean_intensity : np.array
2D array of mean intensity values
std_intensity : np.array
2D array of intensity standard deviations
num_intensity : np.array
2D array of intensity counts per bin
num_gauss : int
Maximum number of Gaussians to fit (default=3)
min_num : int
Minimum number of samples contributing to the intensity mean
(default=3)
min_intensity : float
Minimum intensity magnitude in Rayleighs (default=0.0)
min_lat_perc : int
Minimum percentage of latitude bins needed to define an intensity
function (default=70.0)
Returns
-------
func_params : list-like
Set of parameters used by `mult_gauss_quad` for each MLT bin that
defines the mean intensity as a function of magnetic latitude or set
to a string output with a reason if no successful fit was performed.
fit_cov : list-like
Covariance matrix for each fit
fit_pearsonr : list-like
Pearson r-value for correlation between the observations and the fit
fit_pearsonp : list-like
Pearson p-value for correlation between the observations and the fit
num_peaks : list-like
Number of peaks used in the fit
See Also
--------
utils.distributions.mult_gauss_quad
"""
# Initialize the output
func_params = list()
fit_cov = list()
fit_pearsonr = list()
fit_pearsonp = list()
num_peaks = list()
# Get the percentage multiplier
perc_mult = 100.0 / len(mlat_bins)
# Loop through each MLT bin
for ilt, mlt in enumerate(mlt_bins):
ilats = np.where(np.isfinite(mean_intensity[:, ilt])
& (num_intensity[:, ilt] >= min_num)
& (mean_intensity[:, ilt] >= min_intensity))[0]
if len(ilats) * perc_mult >= min_lat_perc:
# Get the peak information to set the Gaussian fitting parameters
params, ipeaks = get_fitting_params(mlat_bins, ilats, ilt,
mean_intensity, num_gauss)
# Find the desired Gaussian + Quadratic fit using least-squares
# if there are enough latitudes to provide a fit
lsq_args = (mlat_bins[ilats], mean_intensity[ilats, ilt],
1.0 / std_intensity[ilats, ilt])
num_peaks.append(len(ipeaks))
if len(params) <= len(ilats):
lsq_result = leastsq(gauss_quad_err, params, args=lsq_args,
full_output=True)
# Evaluate the least squares output and save the results
if lsq_result[-1] in [1, 2, 3, 4]:
gauss_out = distributions.mult_gauss_quad(
mlat_bins[ilats], lsq_result[0])
fmask = np.isfinite(gauss_out)
if fmask.sum() > 3:
pres = stats.pearsonr(
mean_intensity[ilats, ilt][fmask],
gauss_out[fmask])
func_params.append(lsq_result[0])
# SciPy versions below 1.9.0 will not work
fit_pearsonr.append(pres.statistic)
fit_pearsonp.append(pres.pvalue)
fit_cov.append(lsq_result[1])
found_fit = True
else:
found_fit = False
else:
found_fit = False
else:
found_fit = False
lsq_result = ["insufficient mlat coverage for expected peaks",
-1]
if not found_fit:
while not found_fit and len(ipeaks) > 1:
# Try reducing the number of peaks to obtain a valid fit
ipeaks.pop()
params = list(np.array(params)[:-3])
if len(params) <= len(ilats):
lsq_result = leastsq(gauss_quad_err, params,
args=lsq_args, full_output=True)
num_peaks[-1] = len(ipeaks)
if lsq_result[-1] in [1, 2, 3, 4]:
gauss_out = distributions.mult_gauss_quad(
mlat_bins[ilats], lsq_result[0])
gmask = np.isfinite(gauss_out)
if sum(gmask) > 3:
pres = stats.pearsonr(
mean_intensity[ilats, ilt][gmask],
gauss_out[gmask])
found_fit = True
func_params.append(lsq_result[0])
fit_pearsonr.append(pres.statistic)
fit_pearsonp.append(pres.pvalue)
fit_cov.append(lsq_result[1])
if not found_fit:
func_params.append(lsq_result[-2])
fit_cov.append(None)
fit_pearsonr.append(None)
fit_pearsonp.append(None)
else:
func_params.append("insufficient mlat coverage ({:} < {:})".format(
len(ilats), min_lat_perc / perc_mult))
fit_cov.append(None)
fit_pearsonr.append(None)
fit_pearsonp.append(None)
num_peaks.append(0)
return func_params, fit_cov, fit_pearsonr, fit_pearsonp, num_peaks