Check for vertical offsets in SEG-Y file(s) and apply zero padding. For this, the script uses the trace header keyword DelayRecordingTime and pads seismic traces with zeros to compensate variable recording starting times.

Pad seismic trace data recorded in window mode with a fixed length and variable DelayRecordingTimes. This function reads the DelayRecordingTime from the trace headers and pads the data at top and bottom with zeros.


    Amplitude array with samples (rows) and traces (columns). Transposed array read by segyio for numpy compatibility.

    Array of DelayRecordingTimes for each trace in SEG-Y file [ms].

    Number of traces in SEG-Y file.

    Sampling interval [ms].

    Two way travel times (TWTT) of every sample in a trace.


    Padded input data array in time dimension.

    Updated TWTT that fit the expanded time dimension.

    New number of traces in SEG-Y file.

    Trace indices where DelayRecordingTime changes.

    Minimum DelayRecordingTime in SEG-Y file [ms].

    Maximum DelayRecordingTime in SEG-Y file [ms].

Source code in pseudo_3D_interpolation\
def pad_trace_data(data, recording_delays, n_traces, dt, twt, verbosity=0):
    Pad seismic trace data recorded in window mode with a fixed length and variable `DelayRecordingTimes`.
    This function reads the DelayRecordingTime from the trace headers and pads the data at top and bottom with zeros.

    data : numpy.ndarray
        Amplitude array with samples (rows) and traces (columns).
        Transposed array read by segyio for numpy compatibility.
    recording_delays : numpy.ndarray
        Array of DelayRecordingTimes for each trace in SEG-Y file [ms].
    n_traces : int
        Number of traces in SEG-Y file.
    dt : float
        Sampling interval [ms].
    twt : numpy.ndarray
        Two way travel times (TWTT) of every sample in a trace.

    data_padded : numpy.ndarray
        Padded input data array in time dimension.
    twt_padded : numpy.ndarray
        Updated TWTT that fit the expanded time dimension.
    n_samples_padded : int
        New number of traces in SEG-Y file.
    idx_delay : numpy.ndarray
        Trace indices where DelayRecordingTime changes.
    min_delay : int
        Minimum DelayRecordingTime in SEG-Y file [ms].
    max_delay : int
        Maximum DelayRecordingTime in SEG-Y file [ms].

    # find indices where DelayRecordingTime changes
    idx_delay = np.where(np.roll(recording_delays, 1) != recording_delays)[0]
    # sanity check: in case first and last trace of file have same DelayRecordingTime!
    if idx_delay[0] != 0:
        idx_delay = np.insert(idx_delay, 0, 0, axis=0)

    # minimum and maximum DelayRecordingTime in SEG-Y
    min_delay = recording_delays.min()
    max_delay = recording_delays.max()

    # pad twt array from minimum delay to maximum delay + data window size
    twt_padded = np.arange(min_delay, max_delay + (twt[-1] - twt[0]) + dt, dt)
    n_samples_padded = len(twt_padded)

    # initialize padded data array
    data_padded = np.ndarray((len(twt_padded), n_traces))
    # make sure array is contiguous in memory (C order)
    data_padded = np.ascontiguousarray(data_padded, dtype=data.dtype)

    for i, delay in enumerate(idx_delay):
        xprint(f'{i}: {recording_delays[delay]} ms', kind='debug', verbosity=verbosity)
        # ---------- first data slices ----------
        if idx_delay[i] != idx_delay[-1]:
            # ----- get data slice -----
            data_tmp = data[:, idx_delay[i] : idx_delay[i + 1]]
            data_len = data_tmp.shape[0]
            xprint(f'data_tmp.shape: {data_tmp.shape}', kind='debug', verbosity=verbosity)

            # create array with samples to pad at the data top
            pad_top = np.zeros(
                    np.arange(min_delay, recording_delays[delay], dt).size,
                    idx_delay[i + 1] - idx_delay[i],
            xprint(f'pad_top:    {pad_top.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, 0, pad_top, axis=0)

            # create array with samples to pad at the data bottom
            pad_bottom = np.zeros(
                [twt_padded.size - data_len - pad_top.shape[0], idx_delay[i + 1] - idx_delay[i]]
            xprint(f'pad_bottom:    {pad_bottom.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, data_tmp.shape[0], pad_bottom, axis=0)

            # ----- insert padded data into new ndarray -----
            data_padded[:, idx_delay[i] : idx_delay[i + 1]] = data_tmp

        # ---------- last data slice ----------
        elif idx_delay[i] == idx_delay[-1]:
            xprint('*** last slice ***', kind='debug', verbosity=verbosity)
            # ----- get data slice -----
            data_tmp = data[:, idx_delay[i] : n_traces]
            xprint(f'data_tmp.shape: {data_tmp.shape}', kind='debug', verbosity=verbosity)

            # create array with samples to pad at the data top
            pad_top = np.zeros(
                [np.arange(min_delay, recording_delays[delay], dt).size, n_traces - idx_delay[i]]
            xprint(f'pad_top:    {pad_top.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, 0, pad_top, axis=0)

            # create array with samples to pad at the data bottom
            pad_bottom = np.zeros(
                [twt_padded.size - twt.size - pad_top.shape[0], n_traces - idx_delay[i]]
            xprint(f'pad_bottom:    {pad_bottom.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, data_tmp.shape[0], pad_bottom, axis=0)

            # ----- insert padded data into new ndarray -----
            data_padded[:, idx_delay[i] : n_traces] = data_tmp

    return data_padded, twt_padded, n_samples_padded, (idx_delay, min_delay, max_delay)

wrapper_delrt_padding_segy(in_path, args) #

Pad DelayRecordingTime delrt for single SEG-Y file.

Source code in pseudo_3D_interpolation\
def wrapper_delrt_padding_segy(in_path, args):  # noqa
    """Pad DelayRecordingTime `delrt` for single SEG-Y file."""
    basepath, filename = os.path.split(in_path)
    basename, suffix = os.path.splitext(filename)
    xprint(f'Processing file < {filename} >', kind='info', verbosity=args.verbose)

    default_txt_suffix = 'pad'

    if args.output_dir is None:  # default behavior
        xprint('Creating copy of file in INPUT directory:\n', basepath, kind='info', verbosity=args.verbose)
        out_dir = basepath
    elif args.output_dir is not None and os.path.isdir(args.output_dir):
        xprint('Creating copy of file in OUTPUT directory:\n', args.output_dir, kind='info', verbosity=args.verbose)
        out_dir = args.output_dir
        raise FileNotFoundError(f'The output directory > {args.output_dir} < does not exist')

    if args.txt_suffix is not None:
        out_name = f'{basename}_{args.txt_suffix}'
        out_name = f'{basename}_{default_txt_suffix}'
    out_path = os.path.join(out_dir, f'{out_name}{suffix}')

    # sanity check
    if os.path.isfile(out_path):
        xprint('Output file already exists and will be removed!', kind='warning', verbosity=args.verbose)

    # read SEGY file
    with, 'r', strict=False, ignore_geometry=True) as src:
        n_traces = src.tracecount  # total number of traces
        dt = / 1000  # sample rate [ms]
        # n_samples = src.samples.size        # total number of samples
        twt = src.samples  # two way travel time (TWTT) [ms]

        # get "DelayRecordingTime" value for each trace
        recording_delays = src.attributes(args.byte_delay)[:]
        # find unique DelayRecordingTimes and corresponding trace index
        recording_delays_uniq, recording_delays_idx = np.unique(
            recording_delays[:], return_index=True

        # check if padding is needed
        if len(recording_delays_uniq) > 1:
                f'Found < {len(recording_delays_uniq)} > different "DelayRecordingTimes" for file < {filename} >',
            return False

        # get seismic data [amplitude]; transpose to fit numpy data structure
        data = src.trace.raw[:].T  # eager version (completely read into memory)

        hns = src.bin[segyio.BinField.Samples]  # samples per trace

        # pad source data to generate continuous array without vertical offsets
        data_padded, twt_padded, n_samples_padded, delay_attrs = pad_trace_data(
            data, recording_delays, n_traces, dt, twt, args.verbose
        (idx_delay, min_delay, max_delay) = delay_attrs

        # get source metadata
        spec =
        spec.samples = twt_padded  # set padded samples

        # create new output SEG-Y file with updated header information
        xprint(f'Writing padded output file < {out_name}{suffix} >', kind='info', verbosity=args.verbose)
        with segyio.create(out_path, spec) as dst:
            dst.text[0] = src.text[0]  # copy textual header
            dst.bin = src.bin  # copy binary header
            dst.header = src.header  # copy trace headers
            dst.trace = np.ascontiguousarray(
                data_padded.T, dtype=data.dtype
            )  # set padded trace data

            # update binary header with padded sample count
            dst.bin[segyio.BinField.Samples] = n_samples_padded
            dst.bin[segyio.BinField.SamplesOriginal] = hns

            # update trace headers with padded sample count & new DelayRecordingTime
            for h in dst.header[:]:
                        segyio.TraceField.TRACE_SAMPLE_COUNT: n_samples_padded,
                        segyio.TraceField.DelayRecordingTime: min_delay,

        # update textual header
        text = get_textual_header(out_path)
        info = f'PAD DELRT (byte:{segyio.TraceField.DelayRecordingTime})'
        text_updated = add_processing_info_header(text, info, prefix='_TODAY_')
        write_textual_header(out_path, text_updated)

