def wrapper_tide_compensation(in_path, args):
"""Compensate tidal effect for single SEG-Y."""
basepath, filename = os.path.split(in_path)
basename, suffix = os.path.splitext(filename)
xprint(f'Processing file < {filename} >', kind='info', verbosity=args.verbose)
default_txt_suffix = 'tide'
if args.inplace is True: # `inplace` parameter supersedes any `output_dir`
xprint('Updating SEG-Y inplace', kind='warning', verbosity=args.verbose)
path = in_path
else:
if args.output_dir is None: # default behavior
xprint('Creating copy of file in INPUT directory:\n', basepath, kind='info', verbosity=args.verbose)
out_dir = basepath
elif args.output_dir is not None and os.path.isdir(args.output_dir):
xprint('Creating copy of file in OUTPUT directory:\n', args.output_dir, kind='info', verbosity=args.verbose)
out_dir = args.output_dir
else:
raise FileNotFoundError(f'The output directory > {args.output_dir} < does not exist')
if args.txt_suffix is not None:
out_name = f'{basename}_{args.txt_suffix}'
else:
out_name = f'{basename}_{default_txt_suffix}'
out_path = os.path.join(out_dir, f'{out_name}{suffix}')
# sanity check
if os.path.isfile(out_path):
xprint('Output file already exists and will be removed!', kind='warning', verbosity=args.verbose)
os.remove(out_path)
copy2(in_path, out_path)
path = out_path
# get coordinate byte positions
src_coords_bytes = TRACE_HEADER_COORDS.get(args.src_coords)
# read SEGY file
with segyio.open(path, 'r+', strict=False, ignore_geometry=True) as src:
n_traces = src.tracecount # total number of traces
dt = segyio.tools.dt(src) / 1000 # sample rate [ms]
n_samples = src.samples.size # total number of samples
# twt = src.samples # two way travel time (TWTT) [ms]
xprint(f'n_traces: {n_traces}', kind='debug', verbosity=args.verbose)
xprint(f'n_samples: {n_samples}', kind='debug', verbosity=args.verbose)
xprint(f'dt: {dt}', kind='debug', verbosity=args.verbose)
tracl = src.attributes(segyio.TraceField.TRACE_SEQUENCE_LINE)[:]
tracr = src.attributes(segyio.TraceField.TRACE_SEQUENCE_FILE)[:]
fldr = src.attributes(segyio.TraceField.FieldRecord)[:]
# get seismic data [amplitude]; transpose to fit numpy data structure
data_src = src.trace.raw[:].T # eager version (completely read into memory)
# get scaled coordinates
xprint('Reading coordinates from SEG-Y file', kind='debug', verbosity=args.verbose)
xcoords, ycoords, coordinate_units = scale_coordinates(src, src_coords_bytes)
# get source & destination CRS
crs_src = pyproj.crs.CRS(args.crs_src)
crs_dst = pyproj.crs.CRS('epsg:4326')
# transform coordinate to geographical coordinates
transformer = pyproj.transformer.Transformer.from_crs(crs_src, crs_dst, always_xy=True)
lon, lat = transformer.transform(xcoords, ycoords, errcheck=True)
# filter duplicate lat/lon locations
latlon = np.vstack((lat, lon)).T
latlon_unique = np.unique(
latlon, axis=0, return_index=True, return_inverse=True, return_counts=True
)
latlon_uniq, latlon_uniq_idx, latlon_uniq_inv, latlon_uniq_cnts = latlon_unique
# get unique lat/lons locations
lat, lon = latlon_uniq[:, 0], latlon_uniq[:, 1]
# get datetime elements from trace headers
xprint('Reading timestamps from SEG-Y file', kind='debug', verbosity=args.verbose)
times = []
for h in src.header:
year = h[segyio.TraceField.YearDataRecorded]
day_of_year = h[segyio.TraceField.DayOfYear]
hour = h[segyio.TraceField.HourOfDay]
minute = h[segyio.TraceField.MinuteOfHour]
second = h[segyio.TraceField.SecondOfMinute]
dt_string = datetime.datetime.strptime(
f'{year}-{day_of_year} {hour}:{minute}:{second}', '%Y-%j %H:%M:%S'
).strftime('%Y-%m-%dT%H:%M:%S')
times.append(np.datetime64(dt_string, 's'))
times = np.asarray(times)
# get times masked by unique lat/lons locations
times = times[latlon_uniq_idx]
# predict tides along SEG-Y file at given times
xprint('Predicting tidal elevation along profile', kind='debug', verbosity=args.verbose)
tides_track = tide_predict(
args.model_dir,
lat,
lon,
times,
args.constituents,
correct_minor=args.correct_minor,
mode='track',
)[
latlon_uniq_inv
] # set tide for original lat/lon locations (INCLUDING duplicates)
# reset times (INCLUDING duplicates) -> for output
times = times[latlon_uniq_inv]
# apply tidal compensation to seismic traces
xprint('Compensating tide', kind='debug', verbosity=args.verbose)
data_comp = compensate_tide(
data_src, tides_track, dt, tide_units='meter', units='ms', verbosity=args.verbose
)
# set output amplitudes (transpose to fit SEG-Y format)
xprint('Writing compensated data to disk', kind='debug', verbosity=args.verbose)
src.trace = np.ascontiguousarray(data_comp.T, dtype=data_src.dtype)
# update textual header
text = get_textual_header(path)
text_updated = add_processing_info_header(
text, 'TIDE COMPENSATION', prefix='_TODAY_', newline=True
)
write_textual_header(path, text_updated)
if args.write_aux:
tides_twt = depth2twt(tides_track)
tides_samples = np.around(depth2samples(tides_track, dt=dt, units='ms'), 0)
aux_path = os.path.join(out_dir, f'{out_name}.tid')
xprint(f'Creating auxiliary file < {out_name}.tid >', kind='debug', verbosity=args.verbose)
with open(aux_path, 'w', newline='\n') as fout:
fout.write('tracl,tracr,fldr,time,tide_m,tide_ms,tide_samples\n')
for i in range(tides_track.size):
line = (
f'{tracl[i]},{tracr[i]},{fldr[i]},'
+ f'{np.datetime_as_string(times[i],"s")},'
+ f'{tides_track[i]:.6f},{tides_twt[i]*1000:.3f},'
+ f'{tides_samples[i]:.0f}\n'
)
fout.write(line)