Skip to content

Fix ssp correction #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Copyright (c) 2011-2025 Claudio Satriano <[email protected]>
- New config parameter `clipping_min_amplitude_ratio` to set a threshold for
trace amplitude below which the trace is not checked for clipping
- Use spectral interpolation to compute and apply station residuals
- Limit spectrum and residual to common frequency range when applying
correction before fitting

### Post-Inversion

Expand Down
6 changes: 2 additions & 4 deletions sourcespec/source_residuals.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections import defaultdict
from argparse import ArgumentParser
import numpy as np
from scipy.interpolate import interp1d
import matplotlib
import matplotlib.pyplot as plt
from sourcespec._version import get_versions
Expand Down Expand Up @@ -267,9 +266,8 @@ def compute_mean_residuals(residual_dict, min_spectra=20):
spec_interp.freq = freq_array
# spec_slice.data must exist, so we create it as zeros
spec_interp.data = np.zeros_like(freq_array)
# interpolate data_mag to the new frequency array
f = interp1d(spec.freq, spec.data_mag, bounds_error=False)
spec_interp.data_mag = f(freq_array)
# interpolate data and data_mag to the new frequency array
spec_interp.interp_data_to_new_freq(freq_array)
# norm is 1 where interpolated data_mag is not nan, 0 otherwise
norm = (~np.isnan(spec_interp.data_mag)).astype(float)
# Replace nan data_mag values with zeros for summation
Expand Down
220 changes: 220 additions & 0 deletions sourcespec/spectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

:copyright:
2012-2025 Claudio Satriano <[email protected]>
Kris Vanneste <[email protected]>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
Expand All @@ -22,6 +23,7 @@
import math
import yaml
import numpy as np
from scipy.interpolate import interp1d
# Reduce logging loevel for h5py.
# For h5py, this has to be done before importing the module.
logging.getLogger('h5py').setLevel(logging.WARNING)
Expand Down Expand Up @@ -83,6 +85,54 @@ def __deepcopy__(self, memo):
return new_dict


def _find_nan_edges(data):
"""
Helper function to find NaN values at the beginning and end of a 1D array.

:param data: The data to check for NaN values.
:return: The indices of the NaN values at the beginning and end
of the data.
If there are no NaN values, an empty array is returned.
If all values are NaN, an array with all indices is returned.
"""
nan_mask = np.isnan(data)
# All NaNs
if not np.any(~nan_mask):
return np.arange(len(data))
# NaNs at the beginning
start_nan_idxs = np.where(nan_mask[:np.argmax(~nan_mask)])[0]
# NaNs at the end
end_nan_mask_reversed = nan_mask[::-1]
end_nan_idxs = np.where(
end_nan_mask_reversed[:np.argmax(~end_nan_mask_reversed)])[0]
end_nan_idxs = len(data) - 1 - end_nan_idxs
# Concatenate the two arrays. The result can be empty if there are
# no NaNs at the beginning or end.
return np.concatenate((start_nan_idxs, end_nan_idxs))


def _interpolate_data_to_new_freq(
data, freq, new_freq, fill_value='extrapolate', fix_negative=True):
"""
Helper function to interpolate data to a new frequency range.

:param data: The data to interpolate.
:param freq: The original frequency range.
:param new_freq: The new frequency range.
:param fill_value: The value to use for extrapolation.
If 'extrapolate', the data is extrapolated to the new frequency range.
See scipy.interpolate.interp1d for more details.
:param fix_negative: If True, negative values are replaced by the
minimum value of the original data.
:return: The interpolated data.
"""
f = interp1d(freq, data, fill_value=fill_value, bounds_error=False)
new_data = f(new_freq)
if fix_negative:
new_data[new_data <= 0] = np.min(data)
return new_data


class Spectrum():
"""
A class to handle amplitude spectra.
Expand Down Expand Up @@ -320,6 +370,176 @@ def freq_logspaced(self, value):
self.stats.delta_logspaced = 1.
self._freq_logspaced = value

def make_freq_logspaced(self, delta_logspaced=None):
"""
Create a logspaced frequency axis from the linear frequency axis.

If logspaced data already exist, it is reinterpolated to the new
logspaced frequencies.

:param delta_logspaced: The log10([Hz]) sample interval.
If None, it is computed from the linear frequency axis.
"""
if self.freq.size == 0:
raise ValueError('Frequency axis is empty')
if np.any(self.freq <= 0):
raise ValueError('Frequency axis must be positive')
log_freq = np.log10(self.freq)
if delta_logspaced is None:
log_df = log_freq[-1] - log_freq[-2]
else:
log_df = delta_logspaced
freq_logspaced =\
10**(np.arange(log_freq[0], log_freq[-1] + log_df, log_df))
# If logspaced frequencies already exist,
# reinterpolate the data to the new logspaced frequencies
if (
self.data_logspaced.size > 0
and self.freq_logspaced.size != freq_logspaced.size
):
self.interp_data_logspaced_to_new_freq(freq_logspaced)
self.freq_logspaced = freq_logspaced

def make_logspaced_from_linear(self, which='data'):
"""
Convert the linear data to logspaced data.

:param which: The data to convert.
One of 'data', 'data_mag' or 'both'.

:note: The logspaced frequency axis must exist.
"""
if which == 'data':
data = self.data
fix_negative = True
elif which == 'data_mag':
data = self.data_mag
fix_negative = False
elif which == 'both':
self.make_logspaced_from_linear('data')
self.make_logspaced_from_linear('data_mag')
return
else:
raise ValueError(
f'Invalid value for "which": {which}. '
'Must be one of "data", "data_mag" or "both".'
)
if self.freq.size == 0:
raise ValueError('Frequency axis is empty')
if data.size == 0:
raise ValueError('Data axis is empty')
if self.freq_logspaced.size == 0:
raise ValueError('Logspaced frequency axis is empty')
# Find NaN indexes at the beginning and end of the data
nan_idxs = _find_nan_edges(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to nan_idxs = [] if you want to ignore this step.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claudio,

Thanks for the update. I will try to run it one of the coming days (Friday at the latest). Is that OK?
Kris

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! thanks!

# Do not extrapolate if there are NaN values at the beginning or end
fill_value = np.nan if nan_idxs.size else 'extrapolate'
data = np.delete(data, nan_idxs)
freq = np.delete(self.freq, nan_idxs)
# Reinterpolate data using log10 frequencies
data_logspaced = _interpolate_data_to_new_freq(
data, freq, self.freq_logspaced,
fill_value=fill_value, fix_negative=fix_negative
)
if which == 'data':
self.data_logspaced = data_logspaced
else:
self.data_mag_logspaced = data_logspaced

def make_linear_from_logspaced(self, which='data_logspaced'):
"""
Convert the logspaced data to linear data.

:param which: The data to convert.
One of 'data_logspaced', 'data_mag_logspaced' or 'both'.

:note: The linear frequency axis must exist.
"""
if which == 'data_logspaced':
data_logspaced = self.data_logspaced
fix_negative = True
elif which == 'data_mag_logspaced':
data_logspaced = self.data_mag_logspaced
fix_negative = False
elif which == 'both':
self.make_linear_from_logspaced('data_logspaced')
self.make_linear_from_logspaced('data_mag_logspaced')
return
else:
raise ValueError(
f'Invalid value for "which": {which}. '
'Must be one of "data_logspaced", "data_mag_logspaced" or '
'"both".'
)
if self.freq_logspaced.size == 0:
raise ValueError('Logspaced frequency axis is empty')
if self.data_logspaced.size == 0:
raise ValueError('Data logspaced axis is empty')
if self.freq.size == 0:
raise ValueError('Frequency axis is empty')
# Reinterpolate data using linear frequencies
data = _interpolate_data_to_new_freq(
data_logspaced, self.freq_logspaced, self.freq,
fix_negative=fix_negative
)
if which == 'data_logspaced':
self.data = data
else:
self.data_mag = data

def interp_data_to_new_freq(self, new_freq, fill_value='extrapolate'):
"""
Interpolate the linear data to a new frequency axis.

:param new_freq: The new frequency axis.
:param fill_value: The value to use for extrapolation.
If 'extrapolate', the data is extrapolated to the new frequency
range. See scipy.interpolate.interp1d for more details.
"""
if self.freq.size == 0:
raise ValueError('Frequency axis is empty')
if self.data.size == 0:
raise ValueError('Data axis is empty')
self.data = _interpolate_data_to_new_freq(
self.data, self.freq, new_freq,
fill_value=fill_value,
fix_negative=True
)
if self.data_mag.size:
self.data_mag = _interpolate_data_to_new_freq(
self.data_mag, self.freq, new_freq,
fill_value=fill_value,
fix_negative=False
)
self.freq = new_freq

def interp_data_logspaced_to_new_freq(
self, new_freq, fill_value='extrapolate'):
"""
Interpolate the logspaced data to a new frequency axis.

:param new_freq: The new frequency axis.
:param fill_value: The value to use for extrapolation.
If 'extrapolate', the data is extrapolated to the new frequency
range. See scipy.interpolate.interp1d for more details.
"""
if self.freq_logspaced.size == 0:
raise ValueError('Logspaced frequency axis is empty')
if self.data_logspaced.size == 0:
raise ValueError('Data logspaced axis is empty')
self.data_logspaced = _interpolate_data_to_new_freq(
self.data_logspaced, self.freq_logspaced, new_freq,
fill_value=fill_value,
fix_negative=True
)
if self.data_mag_logspaced.size:
self.data_mag_logspaced = _interpolate_data_to_new_freq(
self.data_mag_logspaced, self.freq_logspaced, new_freq,
fill_value=fill_value,
fix_negative=False
)
self.freq_logspaced = new_freq

def copy(self):
"""Return a copy of the spectrum."""
spec_copy = Spectrum()
Expand Down
58 changes: 27 additions & 31 deletions sourcespec/ssp_build_spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from scipy.integrate import cumtrapz
except ImportError:
from scipy.integrate import cumulative_trapezoid as cumtrapz
from scipy.interpolate import interp1d
from obspy.core import Stream
from sourcespec.spectrum import Spectrum, SpectrumStream
from sourcespec.ssp_setup import ssp_exit
Expand Down Expand Up @@ -349,38 +348,28 @@ def _displacement_to_moment(stats, config):


def _smooth_spectrum(spec, smooth_width_decades=0.2):
"""Smooth spectrum in a log10-freq space."""
"""
Smooth spectrum in a log10-freq space.

This function also adds to the original spectrum the log-spaced
frequencies and data, which are used in the inversion.
"""
# 1. Generate log10-spaced frequencies
freq = spec.freq
_log_freq = np.log10(freq)
# frequencies in logarithmic spacing
log_df = _log_freq[-1] - _log_freq[-2]
freq_logspaced =\
10**(np.arange(_log_freq[0], _log_freq[-1] + log_df, log_df))
# 2. Reinterpolate data using log10 frequencies
# make sure that extrapolation does not create negative values
f = interp1d(freq, spec.data, fill_value='extrapolate')
data_logspaced = f(freq_logspaced)
data_logspaced[data_logspaced <= 0] = np.min(spec.data)
spec.make_freq_logspaced()
# 2. Interpolate data to log10-spaced frequencies
spec.make_logspaced_from_linear()
# 3. Smooth log10-spaced data points
log_df = spec.stats.delta_logspaced
npts = max(1, int(round(smooth_width_decades / log_df)))
data_logspaced = smooth(data_logspaced, window_len=npts)
spec.data_logspaced = smooth(spec.data_logspaced, window_len=npts)
# 4. Reinterpolate to linear frequencies
# make sure that extrapolation does not create negative values
f = interp1d(freq_logspaced, data_logspaced, fill_value='extrapolate')
data = f(freq)
data[data <= 0] = np.min(spec.data)
spec.data = data
spec.make_linear_from_logspaced()
# 5. Optimize the sampling rate of log spectrum,
# based on the width of the smoothing window
# make sure that extrapolation does not create negative values
log_df = smooth_width_decades / 5
freq_logspaced =\
10**(np.arange(_log_freq[0], _log_freq[-1] + log_df, log_df))
spec.freq_logspaced = freq_logspaced
data_logspaced = f(freq_logspaced)
data_logspaced[data_logspaced <= 0] = np.min(spec.data)
spec.data_logspaced = data_logspaced
# note: make_freq_logspaced() will reinterpolate the log-spaced data
# to the new log-spaced frequencies
spec.make_freq_logspaced(log_df)


def _build_spectrum(config, trace):
Expand Down Expand Up @@ -424,7 +413,7 @@ def _build_spectrum(config, trace):
# store coeff to correct back data in displacement units
# for radiated_energy()
spec.stats.coeff = coeff
# smooth
# smooth spectrum. This also creates log-spaced frequencies and data
try:
_smooth_spectrum(spec, config.spectral_smooth_width_decades)
except ValueError as e:
Expand Down Expand Up @@ -490,7 +479,15 @@ def _build_weight_from_inv_frequency(spec, power=0.25):

def _build_weight_from_ratio(spec, specnoise, smooth_width_decades):
weight = spec.copy()
weight.data /= specnoise.data
try:
weight.data /= specnoise.data
except ValueError as e:
logger.error(
f'{spec.id}: Error building weight from noise: {str(e)}'
)
ssp_exit(1)
# replace NaN values with a small value
weight.data[np.isnan(weight.data)] = 1e-9
# save signal-to-noise ratio before log10, smoothing, and normalization
weight.snratio = weight.data.copy()
# The inversion is done in magnitude units,
Expand Down Expand Up @@ -521,8 +518,7 @@ def _build_weight_from_noise(config, spec, specnoise):
weight = _build_weight_from_ratio(
spec, specnoise, config.spectral_smooth_width_decades)
# interpolate to log-frequencies
f = interp1d(weight.freq, weight.data, fill_value='extrapolate')
weight.data_logspaced = f(weight.freq_logspaced)
weight.make_logspaced_from_linear()
weight.data_logspaced /= np.max(weight.data_logspaced)
# Make sure weight is positive
weight.data_logspaced[weight.data_logspaced <= 0] = 0.001
Expand Down Expand Up @@ -732,7 +728,7 @@ def _build_signal_and_noise_spectral_streams(
for specnoise in specnoise_st:
specnoise.data_mag = moment_to_mag(specnoise.data)
# apply station correction if a residual file is specified in config
spec_st = station_correction(spec_st, config)
station_correction(spec_st, config)
return spec_st, specnoise_st


Expand Down
Loading