Source code for pyatoa.tests.test_executive

"""
Test the functionalities of the Executive class which runs many Managers
"""
import os
import pytest
import numpy as np
from pyatoa import Config, Executive, logger


# Turn off the logger for tests
logger.propogate = False
logger.setLevel("CRITICAL")


pytest.skip(allow_module_level=True)

@pytest.fixture
[docs] def events(): """ A list of GeoNet event ids used for gathering metadata from GeoNet client """ event_ids = ["2018p130600", "2012p242656"] return event_ids
@pytest.fixture
[docs] def stations(): """ A list of GeoNet station codes used for gathering metadata and waveforms """ location = "*" channel = "HH?" codes = ["NZ.BFZ", "NZ.KNZ", "NZ.COVZ", "NZ.PXZ", "NZ.WEL"] codes = [f"{c}.{location}.{channel}" for c in codes] return codes
@pytest.fixture
[docs] def config(events): """ A preset Config object that specifies where to grab data from, which already exists in the test data directory """ # syn_path = "./test_data/test_executive/{}" # synthetics = [os.path.abspath(syn_path.format(_)) for _ in events] syn_path = [os.path.abspath("./test_data/test_executive/")] cfg = Config(iteration=1, step_count=0, min_period=10, max_peropd=30, client="GEONET", pyflex_preset="default", adj_src_type="cc_traveltime_misfit", paths={"synthetics": syn_path}) return cfg
[docs] def test_executive_single_event_single_station_no_concurrent(tmpdir, config, events, stations): """ Attempt a single event single station processing without using concurrency """ exc = Executive(event_ids=events[0], station_codes=stations[0], config=config, cwd=tmpdir.strpath) misfit = exc.process_station(f"{events[0]}{exc.cat}{stations[0]}") assert(pytest.approx(misfit, .001) == 1.6696)
[docs] def test_executive_single_event_single_station(tmpdir, config, events, stations): """ Attempt a single event single station processing with concurrency """ exc = Executive(event_ids=events[0], station_codes=stations[0], config=config, cwd=tmpdir.strpath) misfits = exc.process() misfit = misfits[events[0]][stations[0]] assert(pytest.approx(misfit, .001) == 1.6696)
[docs] def test_executive_single_event_multi_station(tmpdir, config, events, stations): """ Attempt a single event multi station processing with concurrency """ exc = Executive(event_ids=events[0], station_codes=stations, config=config, cwd=tmpdir.strpath, max_events=1, max_stations=os.cpu_count()) misfits = exc.process() assert(len(misfits) == 1) assert(len(misfits[events[0]]) == len(stations)) misfit = misfits[events[0]][stations[4]] assert(pytest.approx(misfit, .001) == 0.76983)
# def test_executive_multi_event_multi_station(tmpdir, config, events, # stations): # """ # !!! This test is causing my computer to crash, not sure why, must rework # # Attempt a single event multi station processing with concurrency. # Only do 2 events and 2 stations max to avoid crashing out system # """ # exc = Executive(event_ids=events, station_codes=stations, # config=config, cwd=tmpdir.strpath, max_events=2, # max_stations=2) # misfits = exc.process() # assert(len(misfits) == 2) # assert(len(misfits[events[1]]) == len(stations)) # misfit = misfits[events[1]][stations[2]] # assert(pytest.approx(misfit, .001) == 12.242)