Processing w/ MPI
The following example code processes (i.e., preprocess, window, generate adjoint source) a number of synthetic waveforms in parallel using Pyatoa and MPI (with mpi4py).
The script generates figures, adjoint sources and a text file with misfit values for each source-receiver pair.
Warning
This example requires mpi4py and PySEP which are not a dependency of Pyatoa. To install to the Conda environment created from the installation section, you can run:
$ conda activate pyatoa
$ conda install mpi4py
$ pip install pysep-adjtomo
Example Data
This example requires data stored in the Pyatoa repository. From a working directory, you can grab this data using the following commands:
cd ${PATH_TO_WORKDIR}
cp -r ${PATH_TO_PYATOA}/pyatoa/tests/test_data/process_data_w_mpi.tar.gz .
tar xf process_data_w_mpi.tar.gz
This will create a directory called data
which contains synthetic waveforms
generated from SeisFlows Example 2.
In Example 2, which uses SPECFEM2D as a numerical solver, synthetic data are generated using a homogeneous halfspace model, while observed data are generated using a checkerboard perturbation model.
Script
To run this example on 4 cores, copy the script below to the filename
process_data_w_mpi.py
and run the following (note: you will need to
download the example data beforehand).
mpirun -n 4 process_data_w_mpi.py
Note
This script is also in the repo here:
pyatoa/pyatoa/scripts/process_data_w_mpi.py
#!/usr/bin/env python3
import os
import numpy as np
import matplotlib.pyplot as plt
from mpi4py import MPI
from obspy import UTCDateTime
from pysep.utils.io import read_sem
from pyatoa import Config, Manager
from pyatoa import logger as ptlogger
from pyflex import logger as pflogger
# Avoid overwhelming parallel log statements
for logger in [ptlogger, pflogger]:
logger.setLevel("CRITICAL")
def get_rank_start_stop(ntask, nproc):
"""
Determine the share of work that each processor gets based on the total
number of tasks `ntask`, and the total number of processors `nproc`
https://stackoverflow.com/questions/15658145/how-to-share-work-roughly-\
evenly-between-processes-in-mpi-despite-the-array-size
:type ntask: int
:param ntask: total number of tasks to divvy up
:type nproc: int
:param nproc: number of processors to split `ntask` over
:rtype: list of tuples
:return: the start and stop `ntask` number for each processor `nproc`
"""
count = ntask // nproc
remainder = ntask % nproc
indices = []
for rank in range(0, nproc):
if rank < remainder:
# The first 'remainder' ranks get 'count + 1' tasks each
start = rank * (count + 1)
stop = start + count
else:
# The remaining 'size - remainder' ranks get 'count' task each
start = rank * count + remainder
stop = start + (count - 1)
indices.append((start, stop))
return indices
if __name__ == "__main__":
# Initialize MPI
comm = MPI.COMM_WORLD
# Set up data structure and configuration parameters in rank 0
if comm.rank == 0:
# Define paths to data and output
data_path = "./data/{ev}/{choice}/{sta}.semd"
adjsrc_path = "./data/{ev}/adj"
fig_path = "./figures"
results_fid = "./misfit_results.txt"
# Provides origin time for synthetic data which has none
dummy_time = UTCDateTime("2000-01-01")
# Create 30 unique event and station pairs
_event_names = ["001", "002", "003"]
_station_names = [f"AA.S{i:0>6}.BXY" for i in range(10)]
evsta_pairs = []
for event_name in _event_names:
for sta_name in _station_names:
evsta_pairs.append((event_name, sta_name))
# Generate paths for output results
if not os.path.exists(fig_path):
os.mkdir(fig_path)
for ev in _event_names:
if not os.path.exists(adjsrc_path.format(ev=ev)):
os.mkdir(adjsrc_path.format(ev=ev))
# Determine how to divvy up the event-station pairs among processors
indices = get_rank_start_stop(ntask=len(evsta_pairs), nproc=comm.size)
# Generate Config object that controls processing
config = Config(min_period=10., max_period=100., component_list=["Y"],
pyflex_preset="default", adj_src_type="cc_traveltime",
st_obs_type="syn", st_syn_type="syn"
)
else:
evsta_pairs = None
data_path = None
fig_path = None
adjsrc_path = None
results_fid = None
dummy_time = None
indices = None
config = None
# Broadcast generated data and config to each rank
evsta_pairs = comm.bcast(evsta_pairs, root=0)
data_path = comm.bcast(data_path, root=0)
fig_path = comm.bcast(fig_path, root=0)
adjsrc_path = comm.bcast(adjsrc_path, root=0)
indices = comm.bcast(indices, root=0)
dummy_time = comm.bcast(dummy_time, root=0)
config = comm.bcast(config, root=0)
if comm.rank == 0:
print(f"{len(evsta_pairs)} total tasks to be accomplished with "
f"{comm.size} processors")
# Each rank will process a different part of the event-station list
start, stop = indices[comm.rank]
# Initiate empty array to store misfit and measurement windows
sendbuf = np.empty([stop - start + 1, 3], dtype=float)
# Main processing for each rank: read data, process, write adjoint sources
for i, evsta_pair in enumerate(evsta_pairs[start: stop + 1]):
ev, sta = evsta_pair
# Read in synthetic example data
st_obs = read_sem(data_path.format(ev=ev, choice="obs", sta=sta),
origintime=dummy_time)
st_syn = read_sem(data_path.format(ev=ev, choice="syn", sta=sta),
origintime=dummy_time)
# Standard Pyatoa processing workflow
mgmt = Manager(config=config, st_obs=st_obs, st_syn=st_syn)
mgmt.standardize()
mgmt.preprocess()
mgmt.window()
mgmt.measure()
# Generate plot and adjoint source
mgmt.write_adjsrcs(path=adjsrc_path.format(ev=ev), write_blanks=True)
mgmt.plot(choice="wav", save=f"{fig_path}/{ev}_{sta}.png", show=False)
plt.close()
# Save misfit results to send buffer, which will be broadcast to Rank0
sendbuf[i] = np.array([start + i, mgmt.stats.misfit, mgmt.stats.nwin],
dtype=float)
# Empty receiving buffer to collect results from all other ranks
if comm.rank == 0:
recvbuf = np.empty([len(evsta_pairs), 3], dtype=float)
else:
recvbuf = None
# Gather misfit results from all ranks on Rank 0
comm.Gather(sendbuf, recvbuf, root=0)
# Use main rank to write out misfit information and number of windows
if comm.rank == 0:
with open(results_fid, "w") as f:
for result in recvbuf:
idx, misfit, nwin = result
ev, sta = evsta_pairs[int(idx)]
f.write(f"{ev} {sta} {misfit:.2f} {int(nwin)}\n")
Results
After successfully running the script, the results of the processing workflow will be stored in multiple directories/files.
figures/
: Contains waveform figures showing observed and synthetic traces, misfit windows and adjoint sources for each source receiver pair.data/*/adj/*.adj
: Adjoint source files that are formatted as two-column ASCII files (time v. amplitude), which are ready to be used by SPECFEM.misfit_results.txt
: A text file with information about misfit and number of measurement windows used for each source-receiver pair.