Source code for pyatoa.tests.test_asdf_utils

"""
Test the PyASDF ASDFDataSet auxiliary utilities

!!! TO DO: utils.asdf.write tests
"""
import os
import pytest
from obspy import read, read_events, read_inventory
from pyasdf import ASDFDataSet
from pyatoa import Config, Manager
from pyatoa.utils.asdf import (add, clean, load)


@pytest.fixture
[docs] def empty_dataset(tmpdir): """Re-used test data pointing to STATIONS file""" fid = os.path.join(tmpdir, "empty_dataset.h5") # Make sure the dataset is actually empty if os.path.exists(fid): os.remove(fid) return ASDFDataSet(fid)
@pytest.fixture
[docs] def dataset(): """Filled ASDFDataSet""" return ASDFDataSet("./test_data/test_ASDFDataSet.h5")
@pytest.fixture
[docs] def st_obs(): """ Raw observed waveforms from station NZ.BFZ.HH? for New Zealand event 2018p130600 (GeoNet event id) """ return read("./test_data/test_obs_data_NZ_BFZ_2018p130600.ascii")
@pytest.fixture
[docs] def st_syn(): """ Synthetic data stream generated using Specfem3D and focal mechanism for 2018p130600. Minimum resolved period roughly 10s. """ return read("./test_data/test_syn_data_NZ_BFZ_2018p130600.ascii")
@pytest.fixture
[docs] def cat(): """ ObsPy Event Catalog for New Zealand based event with GeoNet Event ID: 2018p130600 """ return read_events("./test_data/test_catalog_2018p130600.xml")
@pytest.fixture
[docs] def event(cat): """ Event from Catalog """ return cat[0]
@pytest.fixture
[docs] def inv(): """ StationXML information for station NZ.BFZ.HH? """ return read_inventory("./test_data/test_dataless_NZ_BFZ.xml")
@pytest.fixture
[docs] def config(): """ Default Pyatoa Config object """ return Config(event_id="2018p130600")
@pytest.fixture
[docs] def mgmt_pre(config, event, st_obs, st_syn, inv): """ A manager filled with data but pre-workflow """ return Manager(config=config, event=event, st_obs=st_obs, st_syn=st_syn, inv=inv)
@pytest.fixture
[docs] def mgmt_post(mgmt_pre): """ A manager that has completed the full workflow """ mgmt_pre.standardize() mgmt_pre.preprocess(remove_response=True, output="DISP") mgmt_pre.window() mgmt_pre.measure() return mgmt_pre
[docs] def test_add_misfit_windows(empty_dataset, mgmt_post): """ Test adding misfit windows to an ASDFDataSet :return: """ path = mgmt_post.config.aux_path add.add_misfit_windows(windows=mgmt_post.windows, ds=empty_dataset, path=path) windows = empty_dataset.auxiliary_data.MisfitWindows[path] assert(len(windows.list()) == 2) for comp, window_list in mgmt_post.windows.items(): assert(len(window_list) == 1) window_check = window_list[0] # Dynamically construct station tags to access the individual parameters net, sta, loc, cha = window_check.channel_id.split(".") tag = f"{net}_{sta}_{comp}_0" par = windows[tag].parameters # Check a few parameters assert(par["left_index"] == window_check.left) assert(par["right_index"] == window_check.right) assert(par["absolute_starttime"] == str(window_check.absolute_starttime) )
# Retrieve and check phase arrivals # !!! This doesn't work because PyFlex is returning multiple entries # !!! for the same phase. Need to look into this. # for arrival in window_check.phase_arrivals: # assert(par[f"phase_arrival_{arrival['name']}"] == arrival["time"])
[docs] def test_add_adjoint_sources(empty_dataset, mgmt_post): """ Test adding adjoint sources to an ASDFDataSet :return: """ path = mgmt_post.config.aux_path add.add_adjoint_sources(adjsrcs=mgmt_post.adjsrcs, ds=empty_dataset, path=path, time_offset=mgmt_post.stats.time_offset_sec ) adjsrcs = empty_dataset.auxiliary_data.AdjointSources[path] assert(len(adjsrcs.list()) == 2) # Just check misfit value is correct for comp, adjsrc in mgmt_post.adjsrcs.items(): tag = f"NZ_BFZ_BX{comp}" misfit_check = adjsrcs[tag].parameters["misfit"] assert(adjsrc.misfit == misfit_check)
[docs] def test_clean_dataset(empty_dataset, mgmt_post): """ Test dataset clean functions. Need to perform tasks on a dataset we create here, otherwise we may permanently affect test data if we use a pre-built ASDFDataSet """ assert(not hasattr(empty_dataset.auxiliary_data, "MisfitWindows")) mgmt_post.write_to_dataset(ds=empty_dataset) assert(hasattr(empty_dataset.auxiliary_data, "MisfitWindows")) clean.clean_dataset(empty_dataset, fix_windows=False) assert(not hasattr(empty_dataset.auxiliary_data, "MisfitWindows"))
[docs] def test_clean_dataset_fix_windows(empty_dataset, mgmt_post): """ Test cleaning a dataset but retaining windows :return: """ assert(not hasattr(empty_dataset.auxiliary_data, "MisfitWindows")) mgmt_post.write_to_dataset(ds=empty_dataset) assert(hasattr(empty_dataset.auxiliary_data, "MisfitWindows")) clean.clean_dataset(empty_dataset, fix_windows=True) assert(hasattr(empty_dataset.auxiliary_data, "MisfitWindows")) assert(not hasattr(empty_dataset.auxiliary_data, "AdjointSources"))
[docs] def test_load_windows(dataset): """ Test the function that returns windows in the Pyflex output format from an ASDFDataSet """ windows = load.load_windows(ds=dataset, net="NZ", sta="BFZ", iteration="i01", step_count="s00") ds_windows = dataset.auxiliary_data.MisfitWindows.i01.s00 check_val = ds_windows.NZ_BFZ_N_0.parameters["max_cc_value"] assert(windows["N"][0].max_cc_value == check_val)
[docs] def test_load_previous_windows(empty_dataset, mgmt_post): """ Test the function that returns windows in the Pyflex output format from an ASDFDataSet """ # Add two sets of windows to the dataset for iter_ in ["i01", "i02"]: mgmt_post.config.iteration = iter_ mgmt_post.config.step_count = "s00" mgmt_post.write_to_dataset(ds=empty_dataset) windows = empty_dataset.auxiliary_data.MisfitWindows check = load.previous_windows(windows, iteration="i02", step_count="s00") # Access a private attribute to check that we have the correct iter/step path = check._AuxiliaryDataAccessor__auxiliary_data_type _, iter_, step = path.split("/") assert(iter_ == "i01") assert(step == "s00")
[docs] def test_load_adjsrcs(dataset): """ Test the function that returns windows in the Pyflex output format from an ASDFDataSet :param dataset: :return: """ adjsrcs = load.load_adjsrcs(ds=dataset, net="NZ", sta="BFZ", iteration="i01", step_count="s00") ds_adjsrcs = dataset.auxiliary_data.AdjointSources.i01.s00 check_val = ds_adjsrcs.NZ_BFZ_BXN.parameters["misfit"] assert(adjsrcs["N"].misfit == check_val)