"""
Tools for processing obspy.Stream or obspy.Trace objects
Used for preprocessing data through filtering and tapering, zero padding etc.
Also contains tools for synthetic traces such as source time function
convolutions
"""
import numpy as np
from pyatoa import logger
[docs]
def apply_filter(st, min_period=None, max_period=None, min_freq=None,
max_freq=None, corners=2, zerophase=True, **kwargs):
"""
Choose the appropriate filter depending on the ranges given.
Either periods or frequencies can be given. Periods will be prioritized.
Uses Butterworth filters by default.
Filters the stream in place. Kwargs passed to filter functions.
:type st: obspy.core.stream.Stream
:param st: stream object to be filtered
:type min_period: float
:param min_period: minimum filter bound in units of seconds
:type max_period: float
:param max_period: maximum filter bound in units of seconds
:type min_freq: float
:param min_freq: optional minimum filter bound in units of Hz, will be
overwritten by `max_period` if given
:type max_freq: float
:param max_freq: optional maximum filter bound in units of Hz, will be
overwritten by `min_period` if given
:type corners: int
:param corners: number of filter corners to be passed to ObsPy
filter functions
:type zerophase: bool
:param zerophase: if True, run filter backwards and forwards to avoid
any phase shifting
:rtype: obspy.core.stream.Stream
:return: Filtered stream object
"""
if min_period is None and max_period is None:
logger.info(f"no filter bounds given, no filtering will be applied")
return st
# Ensure that the frequency and period bounds are the same
if not min_period and max_freq:
min_period = 1 / max_freq
if not max_period and min_freq:
max_period = 1 / min_freq
if not max_freq:
max_freq = 1/ min_period
if not min_freq:
min_freq = 1 / max_period
# Bandpass if both bounds given
if min_period and max_period:
st.filter("bandpass", corners=corners, zerophase=zerophase,
freqmin=min_freq, freqmax=max_freq, **kwargs)
logger.info(f"bandpass filter: {min_period} - {max_period}s w/ "
f"{corners} corners")
# Minimum period only == lowpass filter
elif min_period:
st.filter("lowpass", freq=max_freq, corners=corners,
zerophase=zerophase, **kwargs)
logger.info(f"lowpass filter: {min_period}s w/ {corners} corners")
# Maximum period only == highpass filter
elif max_period:
st.filter("highpass", freq=min_freq, corners=corners,
zerophase=zerophase, **kwargs)
logger.info(f"highpass filter: {max_period}s w/ {corners} corners")
return st
[docs]
def taper_time_offset(st, taper_percentage=0.05, time_offset_sec=0):
"""
Taper the leading edge of the waveform. If a time offset is given,
e.g. 20s before the event origin time (T_0), taper all the way up from
T=0 to T=T_0, to ensure that there are no impulse-like signals prior to the
event origin.
:type st: obspy.core.stream.Stream
:param st: Stream object to be tapered
:type taper_percentage: float
:param taper_percentage: default taper percentage
:type time_offset_sec: float
:param time_offset_sec: Any time offset between the start of the stream to
the event origin time. All time between these two points will be tapered
to reduce any signals prior to the event origin.
:rtype: obspy.core.stream.Stream
:return: tapered Stream object
"""
taper_amount = st[0].stats.npts * taper_percentage * st[0].stats.delta
if taper_amount > abs(time_offset_sec):
logger.warning("taper amount exceeds time offset, taper may affect "
"data if source receiver distance is short")
elif taper_amount < abs(time_offset_sec):
logger.info(f"adjusting taper to cover time offset {time_offset_sec}")
taper_percentage = (abs(time_offset_sec) /
st[0].stats.npts * st[0].stats.delta)
# Get rid of extra long period signals which may adversely affect processing
st.detrend("simple").taper(taper_percentage, side="left")
return st
[docs]
def zero_pad(st, pad_length_in_seconds, before=True, after=True):
"""
Zero pad the data of a stream, change the starttime to reflect the change.
Useful for if e.g. observed data starttime comes in later than synthetic.
:type st: obspy.stream.Stream
:param st: stream to be zero padded
:type pad_length_in_seconds: int
:param pad_length_in_seconds: length of padding front and back
:type before: bool
:param before: pad the stream before the origin time
:type after: bool
:param after: pad the stream after the last sample
:rtype st: obspy.stream.Stream
:return st: stream with zero padded data object
"""
pad_before, pad_after = 0, 0
st_pad = st.copy()
for tr in st_pad:
array = tr.data
pad_width = int(pad_length_in_seconds * tr.stats.sampling_rate)
# Determine if we should pad before or after
if before:
pad_before = pad_width
if after:
pad_after = pad_width
logger.debug(f"zero pad {tr.id} ({pad_before}, {pad_after}) samples")
# Constant value is default 0
tr.data = np.pad(array, (pad_before, pad_after), mode='constant')
tr.stats.starttime -= pad_length_in_seconds
logger.debug(f"new starttime {tr.id}: {tr.stats.starttime}")
return st_pad
[docs]
def trim_streams(st_a, st_b, precision=1E-3, force=None):
"""
Trim two streams to common start and end times,
Do some basic preprocessing before trimming.
Allows user to force one stream to conform to another.
Assumes all traces in a stream have the same time.
Prechecks make sure that the streams are actually different
:type st_a: obspy.stream.Stream
:param st_a: streams to be trimmed
:type st_b: obspy.stream.Stream
:param st_b: streams to be trimmed
:type precision: float
:param precision: precision to check UTCDateTime differences
:type force: str
:param force: "a" or "b"; force trim to the length of "st_a" or to "st_b",
if not given, trims to the common time
:rtype: tuple of obspy.stream.Stream
:return: trimmed stream objects in the same order as input
"""
# Check if the times are already the same
if st_a[0].stats.starttime - st_b[0].stats.starttime < precision and \
st_a[0].stats.endtime - st_b[0].stats.endtime < precision:
logger.debug(f"start and endtimes already match to {precision}")
return st_a, st_b
# Force the trim to the start and end times of one of the streams
if force:
if force.lower() == "a":
start_set = st_a[0].stats.starttime
end_set = st_a[0].stats.endtime
elif force.lower() == "b":
start_set = st_b[0].stats.starttime
end_set = st_b[0].stats.endtime
# Get starttime and endtime base on min values
else:
st_trimmed = st_a + st_b
start_set, end_set = 0, 1E10
for st in st_trimmed:
start_hold = st.stats.starttime
end_hold = st.stats.endtime
if start_hold > start_set:
start_set = start_hold
if end_hold < end_set:
end_set = end_hold
# Trim to common start and end times
st_a_out = st_a.copy()
st_b_out = st_b.copy()
for st in [st_a_out, st_b_out]:
st.trim(start_set, end_set)
# Trimming doesn't always make the starttimes exactly equal if the precision
# of the UTCDateTime object is set too high.
# Artificially shift the starttime of the streams iff the amount shifted
# is less than the sampling rate
for st in [st_a_out, st_b_out]:
for tr in st:
dt = start_set - tr.stats.starttime
if 0 < dt < tr.stats.sampling_rate:
logger.info(f"shifting {tr.id} starttime by {dt}s")
tr.stats.starttime = start_set
elif dt >= tr.stats.delta:
logger.warning(f"{tr.id} starttime is {dt}s greater than delta")
return st_a_out, st_b_out
[docs]
def match_npts(st_a, st_b, force=None):
"""
Resampling can cause sample number differences which will lead to failure
of some preprocessing or processing steps. This function ensures that `npts`
matches between traces by extending one of the traces with zeros.
A small taper is applied to ensure the new values do not cause
discontinuities.
Note:
its assumed that all traces within a single stream have the same `npts`
:type st_a: obspy.stream.Stream
:param st_a: one stream to match samples with
:type st_b: obspy.stream.Stream
:param st_b: one stream to match samples with
:type force: str
:param force: choose which stream to use as the default npts,
defaults to 'a', options: 'a', 'b'
:rtype: tuple (obspy.stream.Stream, obspy.stream.Stream)
:return: streams that may or may not have adjusted npts, returned in the
same order as provided
"""
# Assign the number of points, copy to avoid editing in place
if not force or force == "a":
npts = st_a[0].stats.npts
st_const = st_a.copy()
st_change = st_b.copy()
else:
npts = st_b[0].stats.npts
st_const = st_b.copy()
st_change = st_a.copy()
for tr in st_change:
diff = abs(tr.stats.npts - npts)
if diff:
logger.info(f"appending {diff} zeros to {tr.get_id()}")
tr.data = np.append(tr.data, np.zeros(diff))
# Ensure streams are returned in the correct order
if not force or force == "a":
return st_const, st_change
else:
return st_change, st_const
[docs]
def normalize(st_a, st_b, choice):
"""
Normalize amplitudes in traces to the absolute maximum amplitude of the
other, or normalize all traces so that their maximum value is a given value.
:type st_a: obspy.stream.Stream
:param st_a: one stream to match samples with
:type st_b: obspy.stream.Stream
:param st_b: one stream to match samples with
:type choice: str or int or float
:param choice: choose which stream to use as the default npts,
defaults to 'a', options: 'a', 'b' or a value that you want to set the
max amplitude to be equal to, e.g., 1
:rtype: tuple (obspy.stream.Stream, obspy.stream.Stream)
:return: streams that have been normalized based on `choice`
:raises NotImplementedError: If incorrect value of `choice` is provided
"""
if isinstance(choice, str):
if choice == "a":
st_const = st_a.copy()
st_change = st_b.copy()
elif choice == "b":
st_change = st_a.copy()
st_const = st_b.copy()
else:
raise NotImplementedError("normalize `choice` must be 'a' or 'b'")
for tr_const, tr_change in zip(st_const, st_change):
tr_change.data *= (abs(tr_const.max()) / abs(tr_change.max()) )
if choice == "a":
st_a_out = st_const
st_b_out = st_change
else:
st_a_out = st_change
st_b_out = st_const
elif isinstance(choice, (int, float)):
st_a_out = st_a.copy()
st_b_out = st_b.copy()
for tr_a, tr_b in zip(st_a, st_b):
tr_a.data *= (choice / abs(tr_a.max()))
tr_b.data *= (choice / abs(tr_b.max()))
else:
raise NotImplementedError(f"invalid choice {choice} for normalization")
return st_a_out, st_b_out
[docs]
def is_preprocessed(st, filter_only=True):
"""
Check to make sure a stream object has not yet been run through
preprocessing.
Assumes that a fresh stream will have no processing attribute in their
stats, or if they do, will not have been filtered
(getting cut waveforms from FDSN appends a 'trim' stat).
:type st: obspy.stream.Stream
:param st: stream to check processing on
:type filter_only: bool
:param filter_only: only check if the stream has been filtered, as other
processing steps (e.g., demeaning, rotating) will also lead to a
'processing' stat. Usually this is what you want to check as filtering
is one of the last steps in the processing chain.
:rtype: bool
:return: if preprocessing has occurred
"""
for tr in st:
if hasattr(tr.stats, "processing"):
if filter_only:
for processing in tr.stats.processing:
# A little hacky, but processing flag will have the str
# ..': filter(options'... to signify that a filter is applied
if "filter(options" in processing:
return True
else:
return bool(tr.stats.processing)
# If nothing found, return False
return False
[docs]
def stf_convolve(st, half_duration, source_decay=4., time_shift=None,
time_offset=None):
"""
Convolve function with a Gaussian window source time function.
Design follows Specfem3D Cartesian "comp_source_time_function.f90"
`hdur` given is `hdur`_Gaussian = hdur/SOURCE_DECAY_MIMIC_TRIANGLE
with SOURCE_DECAY_MIMIC_TRIANGLE ~ 1.68
This gaussian uses a strong decay rate to avoid non-zero onset times, while
still miicking a triangle source time function
:type st: obspy.stream.Stream
:param st: stream object to convolve with source time function
:type half_duration: float
:param half_duration: the half duration of the source time function,
usually provided in moment tensor catalogs
:type source_decay: float
:param source_decay: the decay strength of the source time function, the
default value of 4 gives a Gaussian. A value of 1.68 mimics a triangle.
:type time_shift: float
:param time_shift: Time shift of the source time function in seconds
:type time_offset: If simulations have a value t0 that is negative, i.e. a
starttime before the event origin time. This value will make sure the
source time function doesn't start convolving before origin time to
avoid non-zero onset times
:rtype: obspy.stream.Stream
:return: stream object which has been convolved with a source time function
"""
logger.info(f"convolving data w/ Gaussian (t/2={half_duration:.2f}s)")
sampling_rate = st[0].stats.sampling_rate
half_duration_in_samples = round(half_duration * sampling_rate)
# generate gaussian function
decay_rate = half_duration_in_samples / source_decay
a = 1 / (decay_rate ** 2)
t = np.arange(-half_duration_in_samples, half_duration_in_samples, 1)
gaussian_stf = np.exp(-a * t**2) / (np.sqrt(np.pi) * decay_rate)
# prepare time offset machinery
if time_offset:
time_offset_in_samp = int(time_offset * sampling_rate)
# convolve each trace with the soure time function and time shift if needed
for tr in st:
if time_shift:
tr.stats.starttime += time_shift
data_out = np.convolve(tr.data, gaussian_stf, mode="same")
tr.data = data_out
return st