Skip to content

delrt_padding_segy.py#

Check for vertical offsets in SEG-Y file(s) and apply zero padding. For this, the script uses the trace header keyword DelayRecordingTime and pads seismic traces with zeros to compensate variable recording starting times.

pad_trace_data(data, recording_delays, n_traces, dt, twt, verbosity=0) #

Pad seismic trace data recorded in window mode with a fixed length and variable DelayRecordingTimes. This function reads the DelayRecordingTime from the trace headers and pads the data at top and bottom with zeros.

Parameters:

  • data (ndarray) –

    Amplitude array with samples (rows) and traces (columns). Transposed array read by segyio for numpy compatibility.

  • recording_delays (ndarray) –

    Array of DelayRecordingTimes for each trace in SEG-Y file [ms].

  • n_traces (int) –

    Number of traces in SEG-Y file.

  • dt (float) –

    Sampling interval [ms].

  • twt (ndarray) –

    Two way travel times (TWTT) of every sample in a trace.

Returns:

  • data_padded ( ndarray ) –

    Padded input data array in time dimension.

  • twt_padded ( ndarray ) –

    Updated TWTT that fit the expanded time dimension.

  • n_samples_padded ( int ) –

    New number of traces in SEG-Y file.

  • idx_delay ( ndarray ) –

    Trace indices where DelayRecordingTime changes.

  • min_delay ( int ) –

    Minimum DelayRecordingTime in SEG-Y file [ms].

  • max_delay ( int ) –

    Maximum DelayRecordingTime in SEG-Y file [ms].

Source code in pseudo_3D_interpolation\delrt_padding_segy.py
def pad_trace_data(data, recording_delays, n_traces, dt, twt, verbosity=0):
    """
    Pad seismic trace data recorded in window mode with a fixed length and variable `DelayRecordingTimes`.
    This function reads the DelayRecordingTime from the trace headers and pads the data at top and bottom with zeros.

    Parameters
    ----------
    data : numpy.ndarray
        Amplitude array with samples (rows) and traces (columns).
        Transposed array read by segyio for numpy compatibility.
    recording_delays : numpy.ndarray
        Array of DelayRecordingTimes for each trace in SEG-Y file [ms].
    n_traces : int
        Number of traces in SEG-Y file.
    dt : float
        Sampling interval [ms].
    twt : numpy.ndarray
        Two way travel times (TWTT) of every sample in a trace.

    Returns
    -------
    data_padded : numpy.ndarray
        Padded input data array in time dimension.
    twt_padded : numpy.ndarray
        Updated TWTT that fit the expanded time dimension.
    n_samples_padded : int
        New number of traces in SEG-Y file.
    idx_delay : numpy.ndarray
        Trace indices where DelayRecordingTime changes.
    min_delay : int
        Minimum DelayRecordingTime in SEG-Y file [ms].
    max_delay : int
        Maximum DelayRecordingTime in SEG-Y file [ms].

    """
    # find indices where DelayRecordingTime changes
    idx_delay = np.where(np.roll(recording_delays, 1) != recording_delays)[0]
    # sanity check: in case first and last trace of file have same DelayRecordingTime!
    if idx_delay[0] != 0:
        idx_delay = np.insert(idx_delay, 0, 0, axis=0)

    # minimum and maximum DelayRecordingTime in SEG-Y
    min_delay = recording_delays.min()
    max_delay = recording_delays.max()

    # pad twt array from minimum delay to maximum delay + data window size
    twt_padded = np.arange(min_delay, max_delay + (twt[-1] - twt[0]) + dt, dt)
    n_samples_padded = len(twt_padded)

    # initialize padded data array
    data_padded = np.ndarray((len(twt_padded), n_traces))
    # make sure array is contiguous in memory (C order)
    data_padded = np.ascontiguousarray(data_padded, dtype=data.dtype)

    for i, delay in enumerate(idx_delay):
        xprint(f'{i}: {recording_delays[delay]} ms', kind='debug', verbosity=verbosity)
        # ---------- first data slices ----------
        if idx_delay[i] != idx_delay[-1]:
            # ----- get data slice -----
            data_tmp = data[:, idx_delay[i] : idx_delay[i + 1]]
            data_len = data_tmp.shape[0]
            xprint(f'data_tmp.shape: {data_tmp.shape}', kind='debug', verbosity=verbosity)

            # create array with samples to pad at the data top
            pad_top = np.zeros(
                [
                    np.arange(min_delay, recording_delays[delay], dt).size,
                    idx_delay[i + 1] - idx_delay[i],
                ]
            )
            xprint(f'pad_top:    {pad_top.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, 0, pad_top, axis=0)

            # create array with samples to pad at the data bottom
            pad_bottom = np.zeros(
                [twt_padded.size - data_len - pad_top.shape[0], idx_delay[i + 1] - idx_delay[i]]
            )
            xprint(f'pad_bottom:    {pad_bottom.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, data_tmp.shape[0], pad_bottom, axis=0)

            # ----- insert padded data into new ndarray -----
            data_padded[:, idx_delay[i] : idx_delay[i + 1]] = data_tmp

        # ---------- last data slice ----------
        elif idx_delay[i] == idx_delay[-1]:
            xprint('*** last slice ***', kind='debug', verbosity=verbosity)
            # ----- get data slice -----
            data_tmp = data[:, idx_delay[i] : n_traces]
            xprint(f'data_tmp.shape: {data_tmp.shape}', kind='debug', verbosity=verbosity)

            # create array with samples to pad at the data top
            pad_top = np.zeros(
                [np.arange(min_delay, recording_delays[delay], dt).size, n_traces - idx_delay[i]]
            )
            xprint(f'pad_top:    {pad_top.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, 0, pad_top, axis=0)

            # create array with samples to pad at the data bottom
            pad_bottom = np.zeros(
                [twt_padded.size - twt.size - pad_top.shape[0], n_traces - idx_delay[i]]
            )
            xprint(f'pad_bottom:    {pad_bottom.shape}', kind='debug', verbosity=verbosity)
            data_tmp = np.insert(data_tmp, data_tmp.shape[0], pad_bottom, axis=0)

            # ----- insert padded data into new ndarray -----
            data_padded[:, idx_delay[i] : n_traces] = data_tmp

    return data_padded, twt_padded, n_samples_padded, (idx_delay, min_delay, max_delay)

wrapper_delrt_padding_segy(in_path, args) #

Pad DelayRecordingTime delrt for single SEG-Y file.

Source code in pseudo_3D_interpolation\delrt_padding_segy.py
def wrapper_delrt_padding_segy(in_path, args):  # noqa
    """Pad DelayRecordingTime `delrt` for single SEG-Y file."""
    basepath, filename = os.path.split(in_path)
    basename, suffix = os.path.splitext(filename)
    xprint(f'Processing file < {filename} >', kind='info', verbosity=args.verbose)

    default_txt_suffix = 'pad'

    if args.output_dir is None:  # default behavior
        xprint('Creating copy of file in INPUT directory:\n', basepath, kind='info', verbosity=args.verbose)
        out_dir = basepath
    elif args.output_dir is not None and os.path.isdir(args.output_dir):
        xprint('Creating copy of file in OUTPUT directory:\n', args.output_dir, kind='info', verbosity=args.verbose)
        out_dir = args.output_dir
    else:
        raise FileNotFoundError(f'The output directory > {args.output_dir} < does not exist')

    if args.txt_suffix is not None:
        out_name = f'{basename}_{args.txt_suffix}'
    else:
        out_name = f'{basename}_{default_txt_suffix}'
    out_path = os.path.join(out_dir, f'{out_name}{suffix}')

    # sanity check
    if os.path.isfile(out_path):
        xprint('Output file already exists and will be removed!', kind='warning', verbosity=args.verbose)
        os.remove(out_path)

    # read SEGY file
    with segyio.open(in_path, 'r', strict=False, ignore_geometry=True) as src:
        n_traces = src.tracecount  # total number of traces
        dt = segyio.tools.dt(src) / 1000  # sample rate [ms]
        # n_samples = src.samples.size        # total number of samples
        twt = src.samples  # two way travel time (TWTT) [ms]

        # get "DelayRecordingTime" value for each trace
        recording_delays = src.attributes(args.byte_delay)[:]
        # find unique DelayRecordingTimes and corresponding trace index
        recording_delays_uniq, recording_delays_idx = np.unique(
            recording_delays[:], return_index=True
        )

        # check if padding is needed
        if len(recording_delays_uniq) > 1:
            xprint(
                f'Found < {len(recording_delays_uniq)} > different "DelayRecordingTimes" for file < {filename} >',
                kind='info',
                verbosity=args.verbose,
            )
        else:
            return False

        # get seismic data [amplitude]; transpose to fit numpy data structure
        data = src.trace.raw[:].T  # eager version (completely read into memory)

        hns = src.bin[segyio.BinField.Samples]  # samples per trace

        # pad source data to generate continuous array without vertical offsets
        data_padded, twt_padded, n_samples_padded, delay_attrs = pad_trace_data(
            data, recording_delays, n_traces, dt, twt, args.verbose
        )
        (idx_delay, min_delay, max_delay) = delay_attrs

        # get source metadata
        spec = segyio.tools.metadata(src)
        spec.samples = twt_padded  # set padded samples

        # create new output SEG-Y file with updated header information
        xprint(f'Writing padded output file < {out_name}{suffix} >', kind='info', verbosity=args.verbose)
        with segyio.create(out_path, spec) as dst:
            dst.text[0] = src.text[0]  # copy textual header
            dst.bin = src.bin  # copy binary header
            dst.header = src.header  # copy trace headers
            dst.trace = np.ascontiguousarray(
                data_padded.T, dtype=data.dtype
            )  # set padded trace data

            # update binary header with padded sample count
            dst.bin[segyio.BinField.Samples] = n_samples_padded
            dst.bin[segyio.BinField.SamplesOriginal] = hns

            # update trace headers with padded sample count & new DelayRecordingTime
            for h in dst.header[:]:
                h.update(
                    {
                        segyio.TraceField.TRACE_SAMPLE_COUNT: n_samples_padded,
                        segyio.TraceField.DelayRecordingTime: min_delay,
                    }
                )

        # update textual header
        text = get_textual_header(out_path)
        info = f'PAD DELRT (byte:{segyio.TraceField.DelayRecordingTime})'
        text_updated = add_processing_info_header(text, info, prefix='_TODAY_')
        write_textual_header(out_path, text_updated)

Last update: Monday, 03 July 2023 at 09:46:51