# Copyright Eric Urban 2024 # http://www.hydrogen18.com/blog # Available under CC BY 4.0 # see details here: https://creativecommons.org/licenses/by/4.0/legalcode.en # # runs using Python3.10 # dependencies # numpy - 1.21.5 # scipy - 1.8.0 # matplotlib - 3.5.1 import os import sys import math import struct import wave import numpy import scipy import scipy.signal import random import matplotlib.pyplot as plt from dataclasses import dataclass class SegmentInvalidException(Exception): pass @dataclass(frozen = True) class Segment: start_time : float end_time : float def validate(self): if self.start_time <= 0.0 and self.end_time < 0.0: raise SegmentInvalidException("segment cannot cover entire sample range") if self.start_time == self.end_time: raise SegmentInvalidException("segment cannot have zero length") if self.end_time > 0 and self.end_time < self.start_time: raise SegmentInvalidException("segment cannot have end time before start_time") SILENCE_SEGMENTS = [ ('CanopyUnlocked_left.wav', [Segment(start_time=1.26, end_time=2.53), Segment(start_time=3.92,end_time=-1.0)]), ('CG-Off_left.wav', [Segment(start_time=0.0, end_time=0.29), Segment(start_time=1.33,end_time=2.49), Segment(start_time=3.71,end_time=-1.0)]), ('CheckForEngineFire_left.wav', [Segment(start_time=0.0, end_time=0.215), Segment(start_time=1.84, end_time=3.10), Segment(start_time=4.88,end_time=-1.0)]), ('EngineOilQuantityLow_left.wav', [Segment(start_time=0.0, end_time=0.33), Segment(start_time=2.15, end_time=3.22),Segment(start_time=5.16,end_time=-1.0)]), ('GeneratorAbornmal_left.wav',[Segment(start_time=0.0, end_time=0.3), Segment(start_time=1.8,end_time=2.8), Segment(start_time=4.33,end_time=-1.0)]), ('HydraulicSystemFailure_left.wav',[Segment(start_time=0.0, end_time=0.39), Segment(start_time=1.96, end_time=2.96), Segment(start_time=4.55, end_time=-1.0) ]), ('LeftManifoldPressureLow_left.wav',[Segment(start_time=0.0, end_time=0.19), Segment(start_time=2.05, end_time=2.71), Segment(start_time=4.64, end_time=-1.0)]), ('LowAirFlow_left.wav',[Segment(start_time=0.0, end_time=0.349), Segment(start_time=1.75, end_time=2.61), Segment(start_time=4.2, end_time=-1.0)]), ('PrimaryHydraulicPumpFailed_left.wav',[Segment(start_time=0.0, end_time=0.29), Segment(start_time=2.615, end_time=3.4), Segment(start_time=5.81, end_time=-1.0)]), ('ResTankNotFull_left.wav',[Segment(start_time=0.0, end_time=0.29), Segment(start_time=1.98, end_time=2.98), Segment(start_time=4.61, end_time=-1.0)]), ('RightManifoldPressureLow_left.wav',[Segment(start_time=0.0, end_time=0.23), Segment(start_time=2.15, end_time=3.0), Segment(start_time=4.89, end_time=-1.0)]), ('SWESS-AltitudeLow_left.wav',[Segment(start_time=0.0, end_time=0.29), Segment(start_time=1.58,end_time=2.70),Segment(start_time=4.1, end_time=-1.0)]), ('UtilityHydraulicPumpFailed_left.wav',[Segment(start_time=0.0, end_time=0.24), Segment(start_time=2.55, end_time=3.3), Segment(start_time=5.6, end_time=-1.0)]), ('WeaponUnlocked_left.wav',[Segment(start_time=0.0, end_time=0.17), Segment(start_time=1.349, end_time=2.53), Segment(start_time=3.68, end_time=-.10)]), ] for ss in SILENCE_SEGMENTS: filename, segments = ss for i, ss in enumerate(segments): print("validating %r segment #%d" % (filename, i+1)) ss.validate() PLOT_DPI = 600 SAVE_PLOTS = int(os.environ.get('SAVE_PLOTS', '0')) > 0 low_pass_filter_start_hz = 2850.0 filter_type = 'butterworth' high_pass_filter_start_hz = 260 for ss in SILENCE_SEGMENTS: filename, segments = ss basefilename, _ = filename.split('.', 2) with wave.open(filename, 'rb') as win: sample_rate = win.getframerate() frame_cnt = win.getnframes() width = win.getsampwidth() if width != 2: raise RuntimeError("input file %s not using 2 byte samples" % (filename,)) duration = frame_cnt / sample_rate print("input file %s has sample rate %d and contains %d samples over %.2f seconds" % (filename,sample_rate, frame_cnt, duration,)) audio_data = win.readframes(frame_cnt) noise_spectrum_size = -1 noise_bandstop_pairs = [] if len(segments) == 0: continue for i, segment in enumerate(segments): start_frame = int(segment.start_time * sample_rate) end_frame = int(segment.end_time * sample_rate) if segment.end_time < 0.0: end_frame = frame_cnt sample_cnt = end_frame - start_frame segment_data = audio_data[start_frame * width: end_frame * width] # compute a number of samples large enough to hold the data that is available exp = math.ceil(math.log(sample_cnt)/math.log(2)) required_length = 2**exp print("silence segment %d is %d bytes, %d frames, extending to %d" % (i+1, len(segment_data),sample_cnt,required_length)) clip = [] for j in range(sample_cnt): # read each S16LE as a float sample_raw, = struct.unpack(' (ch+1): condensed_bandstop_pairs.append((cl, ch,)) current_pair = entry continue current_pair = (cl, h,) condensed_bandstop_pairs.append(current_pair) # include last pair del bandstop_pairs merged_bandstop_pairs = [] current_pair = None for m, entry in enumerate(condensed_bandstop_pairs): if m == 0: current_pair = entry continue l, h = entry cl, ch = current_pair current_bandwidth = ch - cl next_bandwidth = h - l gap = l - ch if gap > max(current_bandwidth, next_bandwidth)/2.0 and gap > 20: merged_bandstop_pairs.append(current_pair) current_pair = entry continue current_pair = (cl, h,) merged_bandstop_pairs.append(current_pair) # include last pair del condensed_bandstop_pairs peak = -2.0 peak_bin = -1 for m ,v in enumerate(fft_result_abs): if v > peak: peak_bin = m peak = v num_bins = len(fft_result) # normalize the value t = numpy.arange(0.0, 1.0, 0.01) s = 1 + numpy.sin(2 * numpy.pi * t) fig, ax = plt.subplots() show_bins_cnt = math.ceil(low_pass_filter_start_hz / bin_width_hz) truncated_fft_result_abs = [x / len(fft_result_abs) for x in fft_result_abs[:show_bins_cnt]] ax.plot(list(x * bin_width_hz for x in range(show_bins_cnt)), truncated_fft_result_abs) for n, (start_hz, end_hz) in enumerate(merged_bandstop_pairs): bw = end_hz - start_hz center_hz = start_hz + (bw/2.0) ax.bar(center_hz, max(truncated_fft_result_abs), bw, color =(1.0,0.0,0.0,0.33)) ax.set(xlabel='Freq (Hz)', ylabel='magnitude', title='%s silence segment %d' %(filename,i+1,)) ax.grid() if SAVE_PLOTS: fig.savefig("%s_silence_segment_%d.png" % (basefilename,i+1,), dpi=PLOT_DPI) fig.clear() print("peak is at bin %d (freq:%.1f Hz) (value:%.6f)" % (peak_bin,peak_bin * bin_width_hz, peak/num_bins,)) if noise_spectrum_size < sample_cnt: noise_spectrum_size = sample_cnt noise_bandstop_pairs = merged_bandstop_pairs # use the computed noise spectrum to subtract noise from the main signal clip = [] for j in range(frame_cnt): # read each S16LE as a float sample_raw, = struct.unpack(' -20.0: print("bandstopping %r lowest:%r highest:%r SKIPPED" % (entry, min(attenuation_in_db), max(attenuation_in_db))) continue else: print("bandstopping %r lowest:%r highest:%r" % (entry, min(attenuation_in_db), max(attenuation_in_db))) # apply the filter # https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.sosfilt.html clip = scipy.signal.sosfilt(dfilter, clip) fig = plt.figure() ax = fig.add_subplot(1, 1, 1) ax.set_title("bandstop filter %d %d Hz-%d Hz " % (i, entry[0], entry[1],)) ax.semilogx(w, attenuation_in_db) ax.set_xlabel('Frequency [Hz]') ax.set_ylabel('Amplitude [dB]') ax.grid(which='both',axis='both') ax.axis((10,4000, -100, 10)) if SAVE_PLOTS: fig.savefig("%s_digital_filter_%d.png" % (basefilename,i), dpi=PLOT_DPI) fig.clear() # apply a gain segment gain_lo_hz = 610 gain_hi_hz = 710 dfilter = scipy.signal.iirfilter(N=13, Wn=(gain_lo_hz, gain_hi_hz), analog=False, btype='bandpass', output='sos', fs=sample_rate, ftype=filter_type) bandpass_filtered_clip = scipy.signal.sosfilt(dfilter, clip) for i, v in enumerate(bandpass_filtered_clip): # Note - this could overflow in some circumstances but that doesn't happen in practice it seems clip[i] += v*4 del bandpass_filtered_clip # apply a high pass filter dfilter = scipy.signal.iirfilter(N=3, Wn=high_pass_filter_start_hz, analog=False, btype='highpass',output='sos', fs=sample_rate, ftype=filter_type) clip = scipy.signal.sosfilt(dfilter, clip) # apply a low pass filter dfilter = scipy.signal.iirfilter(N=3, Wn=low_pass_filter_start_hz, analog=False, btype='lowpass',output='sos', fs=sample_rate, ftype=filter_type) clip = scipy.signal.sosfilt(dfilter, clip) drop_off_sample_count = 0.25 * sample_rate rng = random.Random(1337) for i, segment in enumerate(segments): start_frame = int(segment.start_time * sample_rate) end_frame = int(segment.end_time * sample_rate) if segment.end_time < 0.0: end_frame = frame_cnt delta = end_frame - start_frame taper_at_end = end_frame == frame_cnt ramp_up_at_start = start_frame == 0 random_v = 0.125 + rng.random() * 0.125 # reduce amplitudes by -6 to -12 if ramp_up_at_start: print("applying initial volume ramp up") for j in range(start_frame, end_frame): dist = end_frame - j if dist < drop_off_sample_count: clip[j] *= math.sin(math.pi/2.0 * dist/drop_off_sample_count) else: clip[j] *= random_v elif taper_at_end: print("applying ending volume ramp down") for j in range(start_frame, end_frame): dist = j - start_frame if dist < drop_off_sample_count: clip[j] *= math.cos(math.pi/2.0 * dist/drop_off_sample_count) else: clip[j] *= random_v else: print("applying mid clip ramp down then up") for j in range(start_frame, end_frame): dist = j - start_frame if dist < drop_off_sample_count: clip[j] *= math.cos(math.pi/2.0 * dist/drop_off_sample_count) elif dist > (end_frame - drop_off_sample_count): dist = end_frame - j clip[j] *= math.sin(math.pi/2.0 * dist/drop_off_sample_count) else: clip[j] *= random_v # convert back to audio file restored_samples = clip # remove zero padding restored_samples = restored_samples[0:frame_cnt] clip = bytearray(len(restored_samples)*width) output_filename = '%s_restored.wav' % (basefilename,) with wave.open(output_filename, mode='wb') as wout: wout.setnchannels(1) wout.setframerate(sample_rate) wout.setsampwidth(width) wout.setnframes(len(restored_samples)) for i, v in enumerate(restored_samples): if v > 1.0: v_raw = 2**15 - 1 elif v < -1.0: v_raw = -1 * (2**15 - 1) else: v_raw = round(v * 2**15) # print("v: %r; v_raw:%r" % (v, v_raw,)) struct.pack_into('