from collections import defaultdict
import math
import os
import numpy as np
import pandas as pd
from obspy import UTCDateTime
from obspy.geodetics import gps2dist_azimuth
[docs]
def decimate(phasepicks, data, ev_lon, ev_lat, az_bin, dist_bin, max_bin):
"""
Detect regions covered by clusters of stations within specific bins and set a maximum amount of stations within each bin, removing the excess.
Args:
phasepicks (list): list of phase picks
data (dict): dictionary that contains---among others---the station coordinates for each station
ev_lon (float): event's longitude
ev_lat (float): event's latitude
az_bin (float): azimuthal bins
dist_bin (float): epicentral distance bins
max_bin (int): maximum amount of stations allowed per bin
Returns:
filtered_phasepicks (list): list of stations with picks, where there's a maximum of stations per bin
"""
original_phasepicks = phasepicks.copy()
if len(original_phasepicks[0]) > 3:
new_phasepicks, stations_considered = [], []
for pick in phasepicks:
if pick[0] not in stations_considered:
new_phasepicks.append((pick[0], pick[2], pick[3]))
stations_considered.append(pick[0])
phasepicks = new_phasepicks
stations_with_picks = []
for pick in phasepicks:
stations_with_picks.append(pick[0])
distances, azimuths = [], []
for station in stations_with_picks:
sta_lat, sta_lon = data[station]['coords'][0], data[station]['coords'][1]
epic_dist, azimuth, _ = gps2dist_azimuth(ev_lat, ev_lon, sta_lat, sta_lon)
distances.append(epic_dist * 0.001)
azimuths.append(azimuth)
distances = np.array(distances)
azimuths = np.array(azimuths)
az_bins = (azimuths // az_bin).astype(int)
dist_bins = (distances // dist_bin).astype(int)
bin_keys = list(zip(az_bins, dist_bins))
binned_stations = defaultdict(list)
for idx, key in enumerate(bin_keys):
binned_stations[key].append(idx)
stations_to_remove = []
for station_list in binned_stations.values():
if len(station_list) > max_bin:
sorted_by_dist = sorted(station_list, key=lambda i: distances[i])
to_remove = sorted_by_dist[max_bin:]
stations_to_remove.extend(to_remove)
stations_to_remove_names = [stations_with_picks[i] for i in stations_to_remove]
filtered_phasepicks = [pick for pick in original_phasepicks if pick[0] not in stations_to_remove_names]
return filtered_phasepicks
[docs]
def tt_theo_before_assoc(ev_time, teo_p_time, teo_s_time, pick, tol_p, tol_s):
"""
This function decides if a predicted pick is close enough to the theoretical pick (for example, obtained using Taup)
Args:
ev_time (str) Origin time of the event. Format: yyyy-mm-dd hh:mm:ss.ss
teo_p_time (float): Arrival time of the P-wave, in seconds after the event
teo_s_time (float): Arrival time of the S-wave, in seconds after the event
pick (SeisBench pick): The pick as predicted by the models in SeisBench
tol_p (int or float): Seconds (in absolute value) in which a predicted P-pick is allowed to differ from the theoretical P-pick
tol_s (int or float): Seconds (in absolute value) in which a predicted S-pick is allowed to differ from the theoretical S-pick
"""
obs_time = pick.peak_time - ev_time
if pick.phase == 'P':
theo_obs_abs_diff = abs(obs_time - teo_p_time)
if theo_obs_abs_diff > tol_p:
verdict = 'fail'
else:
verdict = 'pass'
else:
theo_obs_abs_diff = abs(obs_time - teo_s_time)
if theo_obs_abs_diff > tol_s:
verdict = 'fail'
else:
verdict = 'pass'
return verdict
[docs]
def generate_csv(outputs, data, snr_data, ev_time):
"""
Generates CSV files with pick detections
Args:
outputs (dict): The predicted phases/arrival times obtained from SeisBench for each station
data (dict): A dictionary with the nearest stations (ObsPy streams) to the event which contain useful channels for phase-picking
snr_data (dict): Dictionary with the signal-to-noise ratio for each pick at each station
ev_time (str): ev_time (str): Origin time of the event. Format: yyyy-mm-dd hh:mm:ss.ss
Returns:
prediction result files in CSV format for each station
"""
ev_time = str(UTCDateTime(ev_time))
os.mkdir(f"{ev_time}_tiebenn_loc/csv_picks")
for stat in outputs[0]['station'].drop_duplicates():
outputs_sta = outputs[0][outputs[0]['station'] == stat]
net = []; sta = []; st_lat = []; st_lon = []; st_elv = []; phase = []; arrival_time = []; probability = []; snr = []
for dets in range(len(outputs_sta)):
net.append(data[stat]['network'])
sta.append(stat)
st_lat.append(data[stat]['coords'][0])
st_lon.append(data[stat]['coords'][1])
st_elv.append(data[stat]['coords'][2])
for phs in outputs_sta['phase']:
phase.append(phs)
try:
snr.append(snr_data[stat][phs])
except Exception:
snr.append(np.nan)
for arrt in outputs_sta['peak_time']:
arrival_time.append(arrt)
for prob in outputs_sta['peak_value']:
probability.append(prob)
df = {'network': net, 'station': sta, 'station_lat': st_lat, 'station_lon': st_lon, 'station_elv': st_elv, 'phase': phase, 'arrival_time': arrival_time, 'probability': probability, 'snr': snr}
dataframe = pd.DataFrame(data=df)
filename = f"{ev_time}_tiebenn_loc/csv_picks/{stat}_{ev_time}_prediction_results.csv"
dataframe.to_csv(filename, index=False)
return
[docs]
def str2bool(v):
"""
Tries to interpret an input as a boolean argument. Expected true boolean arguments are: yes, true, t, y, and 1. Expected false boolean arguments: no, false, f, n, 0
Args:
v (str): Argument to be interpreted as True or False
Returns:
v (bool): The string interpreted as boolean
"""
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
[docs]
def chan_comma(chanlist):
"""
Transforms the channel list for a given station in a string of channels, each separated by a comma. This is the multiple channel format for a FDSN waveform request
Args:
chanlist (list): List of channels from which waveforms will be downloaded
Returns:
string (str): String of channels separated by a comma
"""
string = ''
for i in range(len(chanlist)):
string = f"{string}{chanlist[i]},"
string = string.rstrip(string[-1])
return string
[docs]
def strmonth2num(strmonth):
"""
It gets the month number corresponding to the 3-character month (e.g. for the input 'Jan' the output is 1)
Args:
strmonth (str): Month in 3-character format (not case-sensitive)
Returns:
nummonth (int): Month number
"""
months = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}
nummonth = str(months[strmonth.lower()])
return nummonth
[docs]
def get_snr(data, picks, wlen):
"""
Estimates the signal to noise ratio (SNR) of a seismogram. Adapted from the EQTransformer repository (https://github.com/smousavi05/EQTransformer)
Args:
data (ObsPy Stream): Stream with waveforms and metadata for a given station
picks (dict): Sample point where a specific phase arrives
wlen (int, positive): The length of the window in seconds for calculating the SNR
Returns:
snr (dict): A dictionary with the estimated SNR in db for each pick
"""
snr = {}
st_t = []; et_t = []; lendat = []
for channels in range(len(data)):
st_t.append(data[channels].stats.starttime)
et_t.append(data[channels].stats.endtime)
lendat.append(data[channels].stats.npts)
starttime = min(st_t); endtime = max(et_t); lendata = max(lendat)
def merge_pad(st, st_sta, st_end):
cha = []; merge = False
for ch in range(len(st)):
if st[ch].stats.channel not in cha:
cha.append(st[ch].stats.channel)
else:
merge = True
if merge:
st.merge()
pad = False
for ch in range(len(st)):
if st[ch].stats.starttime - st_sta > 0 or st[ch].stats.endtime - st_end < 0:
pad = True
if pad:
st.trim(starttime=st_sta, endtime=st_end, pad=True, fill_value=0)
return st
for p in picks['phase']:
pick = picks[picks['phase'] == p]
peak = UTCDateTime(str(pick['peak_time'].tolist()[0]))
w_noise = data.copy()
w_signal = data.copy()
if (peak - starttime) >= wlen and (endtime - peak) > wlen:
nw_sta = peak - wlen; nw_end = peak
sw_sta = peak; sw_end = peak + wlen
if (peak - starttime) < wlen and (endtime - peak) > wlen:
nw_sta = starttime; nw_end = peak
sw_sta = peak; sw_end = peak + wlen
if (endtime - peak) <= wlen:
nw_sta = peak - wlen; nw_end = peak
sw_sta = peak; sw_end = endtime
nw1 = w_noise.trim(starttime=nw_sta, endtime=nw_end)
sw1 = w_signal.trim(starttime=sw_sta, endtime=sw_end)
nw1 = merge_pad(st=nw1, st_sta=nw_sta, st_end=nw_end)
sw1 = merge_pad(st=sw1, st_sta=sw_sta, st_end=sw_end)
snr[pick['phase'].tolist()[0]] = round(10 * math.log10((np.percentile(sw1, 95) / np.percentile(nw1, 95)) ** 2), 1)
return snr
[docs]
def station_density(epicenter, stations):
"""
Calculates station density within a circular area around the epicenter.
Args:
epicenter (tuple): (latitude, longitude) for the event
stations (list): list of tuples [(latitude, longitude), ...] for station locations
Returns:
density (float): Station density (stations per square km).
"""
distances = np.array([gps2dist_azimuth(epicenter[0], epicenter[1], st[0], st[1])[0] / 1000.0
for st in stations
])
if len(stations) < 5:
distance = max(distances)
else:
distance = np.percentile(distances, 80)
area = np.pi * (distance ** 2)
density = len(stations) / area if area > 0 else 0
return density
[docs]
def calculate_azimuths(epicenter, stations):
"""
Calculates azimuths from the epicenter to each station.
Args:
epicenter (tuple): (epicenter_latitude, epicenter_longitude)
stations (list): List of (lat, lon) tuples of the coordinates of each station
Returns:
azimuths (NumPy array): Array of station azimuths
"""
lat_e, lon_e = epicenter
azimuths = []
for lat_s, lon_s in stations:
_, azimuth, _ = gps2dist_azimuth(lat_e, lon_e, lat_s, lon_s)
azimuths.append(azimuth)
return np.array(azimuths)
[docs]
def azimuthal_gaps(event_lat, event_lon, station_coords):
"""
Calculates the primary and secondary azimuthal gaps given an event location and station coordinates.
Args:
event_lat (float): Epicentral latitude
event_lon (float): Epicentral longitude
station_coords (list): List of station coordinates tuples: [(lat1, lon1), (lat2, lon2), (lat3, lon3), ...]
Returns:
tuple: (primary_gap, secondary_gap:
"""
def calculate_azimuth(lat1, lon1, lat2, lon2):
"""
Calculate the azimuth from one geographic coordinate to another.
"""
lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
delta_lon = lon2 - lon1
x = np.sin(delta_lon) * np.cos(lat2)
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(delta_lon)
azimuth = np.degrees(np.arctan2(x, y))
return (azimuth + 360) % 360
azimuths = [calculate_azimuth(event_lat, event_lon, lat, lon) for lat, lon in station_coords]
azimuths_sorted = np.sort(azimuths)
extended_azimuths = np.append(azimuths_sorted, azimuths_sorted[0] + 360)
gaps = np.diff(extended_azimuths)
primary_gap = np.max(gaps)
primary_index = np.argmax(gaps)
primary_pair = (azimuths_sorted[primary_index], azimuths_sorted[(primary_index + 1) % len(azimuths_sorted)])
secondary_gap = 0
secondary_pair = None
for i in range(len(azimuths_sorted)):
reduced_azimuths = np.delete(azimuths_sorted, i)
extended_reduced = np.append(reduced_azimuths, reduced_azimuths[0] + 360)
reduced_gaps = np.diff(extended_reduced)
largest_gap = np.max(reduced_gaps)
if largest_gap > secondary_gap:
secondary_gap = largest_gap
secondary_index = np.argmax(reduced_gaps)
secondary_pair = (reduced_azimuths[secondary_index], reduced_azimuths[(secondary_index + 1) % len(reduced_azimuths)])
return primary_gap, secondary_gap
[docs]
def normalize(param, mode, lb, ub):
"""
Parameter normalization using robust statistics. The resulting normalized parameter ranges between 0 and 1. Each parameter will be normalized after the modes and bundary values of Ramos et al. (2025, in preparation).
Args:
param (float): Parameter to be normalized
mode (str): Choose a normalization mode between 'simple' (simple robust normalization using lb and ub) or 'log' (logarithmic 10-base, robust normalization)
lb (float): Lower boundary of normalization. If param < lb, then it will be clipped to 0
ub (float): Upper boundary of normalization. If param > ub, then it will be clipped to 1. Note: lb as well as ub for the parameters to be normalized using the log mode, are actually the logarithm of the normalization boundaries
Returns:
norm_param (float): The normalized parameter
"""
if mode == 'simple':
norm_param = (np.array(param) - lb) / (ub - lb)
elif mode == 'log':
epsilon = 1e-6
log_param = np.log10(np.array(param) + epsilon)
norm_param = (log_param - lb) / (ub - lb)
norm_param = np.clip(norm_param, 0, 1)
return norm_param
[docs]
def calculate_lqs(loc_file, sta_file):
"""
Calculates the Location Quality Score (LQS) metric, as in the work of Ramos et al. (2025, in preparation). This metric describes the quality of automatic locations based on location parameters and station network distribution in the vicinities of the epicenter.
Args:
loc_file (str): Full path to location file in NonLinLoc format (see section 'Formats' in http://alomax.free.fr/nlloc/)
sta_file (str): Full path to station file in NonLinLoc format (see section 'Formats' in http://alomax.free.fr/nlloc/)
Returns:
parameters (Pandas Dataframe): A dataframe with the original event datetime (from the catalog) and the datetime calculated by NonLinLoc, the LQS metric of the event and the eight normalized parameters used for the calculation of the LQS.
"""
print('Generating radarplot from LQS value...')
st4loc = []
with open(loc_file, 'r') as f:
for line in f:
if len(line.split()) > 1:
if line.split()[0] == 'COMMENT':
datetime_orig = str(UTCDateTime(f'{line.split()[1]} {line.split()[2]}'.split('"')[1]))
if line.split()[0] == 'GEOGRAPHIC':
li = line.split()
epicenter = (float(li[9]), float(li[11]))
datetime = UTCDateTime(int(li[2]), int(li[3]), int(li[4]), int(li[5]), int(li[6]), float(li[7]))
if line.split()[0] == 'QUALITY':
rms = float(line.split()[8])
npicks = float(line.split()[10])
near_sta = float(line.split()[14])
if line.split()[0] == 'STATISTICS':
li = line.split()
cov_xx = float(li[8]); cov_xy = float(li[10]); cov_xz = float(li[12])
cov_yx = cov_xy; cov_yy = float(li[14]); cov_yz = float(li[16])
cov_zx = cov_xz; cov_zy = cov_yz; cov_zz = float(li[18])
covariance = np.array([[cov_xx, cov_xy, cov_xz], [cov_yx, cov_yy, cov_yz], [cov_zx, cov_zy, cov_zz]])
detcov = np.linalg.det(covariance)
if line.split()[1] == '?' and float(line.split()[16]) != 0.0:
if line.split()[0] not in st4loc:
st4loc.append(line.split()[0])
f.close()
stations = []
with open(sta_file, 'r') as f:
for line in f:
if line.split()[1] in st4loc:
st_lat = float(line.split()[3]); st_lon = float(line.split()[4])
stations.append((st_lat, st_lon))
f.close()
density = station_density(epicenter, stations)
azimuths = calculate_azimuths(epicenter, stations)
aui = azimuthal_uniformity_index(azimuths)
azgaps = azimuthal_gaps(epicenter[0], epicenter[1], stations)
azgap = azgaps[0]
sec_azgap = azgaps[1]
norm_detcov = normalize(detcov, 'log', lb=-3.5168346269163018, ub=-0.11086218013379719)
norm_dens = normalize(density, 'log', lb=-3.1661675982662865, ub=-2.4270297099010847)
norm_azgap = normalize(azgap, 'simple', lb=23.246041073223335, ub=140)
norm_sazgap = normalize(sec_azgap, 'simple', lb=33.52630353367535, ub=160)
norm_aui = normalize(aui, 'simple', lb=4.927710011230288, ub=14.332695430500562)
norm_nsta = normalize(near_sta, 'log', lb=1.000000043429446, ub=1.4771212691961448)
if rms < 0.2 and density < 0.0006:
norm_rms = 1.0
else:
norm_rms = normalize(rms, 'log', lb=-0.849381390423459, ub=-0.2505623242671882)
norm_npicks = normalize(npicks, 'log', lb=1.7454491666816259, ub=2.1986570897031217)
weights = {'sta_den': 0.125, 'azgap': 0.05, 'sec_azgap': 0.1, 'rms': 0.1, 'near_sta': 0.1, 'det_cov': 0.35, 'aui': 0.125, 'npicks': 0.05}
LQS = weights['sta_den'] * norm_dens + weights['azgap'] * (1 - norm_azgap) + weights['sec_azgap'] * (1 - norm_sazgap) + weights['rms'] * (1 - norm_rms) + weights['near_sta'] * (1 - norm_nsta) + weights['det_cov'] * (1 - norm_detcov) + weights['npicks'] * norm_npicks + weights['aui'] * (1 - norm_aui)
norm_params = {'norm.det.cov': norm_detcov, 'norm.sta.den': norm_dens, 'norm.aui': norm_aui, 'norm.azgap': norm_azgap, 'norm.sec.azgap': norm_sazgap, 'norm.near.sta': norm_nsta, 'norm.rms': norm_rms, 'norm.npicks': norm_npicks}
merged = {'datetime_orig': datetime_orig, 'datetime': str(datetime), 'LQS': LQS} | norm_params
parameters = pd.DataFrame([merged])
return parameters