fixed #7 and code optimizations

This commit is contained in:
Félix Boisselier
2023-12-10 06:40:08 +01:00
parent 8721488d8c
commit 7050018274
2 changed files with 59 additions and 92 deletions

View File

@@ -12,9 +12,9 @@
##################################################################### #####################################################################
import optparse, matplotlib, sys, importlib, os import optparse, matplotlib, sys, importlib, os
from textwrap import wrap
from collections import namedtuple from collections import namedtuple
import numpy as np import numpy as np
import scipy
import matplotlib.pyplot, matplotlib.dates, matplotlib.font_manager import matplotlib.pyplot, matplotlib.dates, matplotlib.font_manager
import matplotlib.ticker, matplotlib.gridspec, matplotlib.colors import matplotlib.ticker, matplotlib.gridspec, matplotlib.colors
import matplotlib.patches import matplotlib.patches
@@ -165,73 +165,42 @@ def pair_peaks(peaks1, freqs1, psd1, peaks2, freqs2, psd2):
def compute_spectrogram(data): def compute_spectrogram(data):
N = data.shape[0] N = data.shape[0]
Fs = N / (data[-1,0] - data[0,0]) Fs = N / (data[-1, 0] - data[0, 0])
# Round up to a power of 2 for faster FFT # Round up to a power of 2 for faster FFT
M = 1 << int(.5 * Fs - 1).bit_length() M = 1 << int(.5 * Fs - 1).bit_length()
window = np.kaiser(M, 6.) window = np.kaiser(M, 6.)
def _specgram(x):
return matplotlib.mlab.specgram(
x, Fs=Fs, NFFT=M, noverlap=M//2, window=window,
mode='psd', detrend='mean', scale_by_freq=False)
d = {'x': data[:,1], 'y': data[:,2], 'z': data[:,3]} def _specgram(x):
pdata, bins, t = _specgram(d['x']) x_detrended = x - np.mean(x) # Detrending by subtracting the mean value
for ax in 'yz': return scipy.signal.spectrogram(
pdata += _specgram(d[ax])[0] x_detrended, fs=Fs, window=window, nperseg=M, noverlap=M//2,
return pdata, bins, t detrend='constant', scaling='density', mode='psd')
d = {'x': data[:, 1], 'y': data[:, 2], 'z': data[:, 3]}
f, t, pdata = _specgram(d['x'])
for axis in 'yz':
pdata += _specgram(d[axis])[2]
return pdata, t, f
###################################################################### ######################################################################
# Computation of the differential spectrogram # Computation of the differential spectrogram
###################################################################### ######################################################################
# Performs a standard bilinear interpolation for a given x, y point based on surrounding input grid values. This function # Interpolate source_data (2D) to match target_x and target_y in order to
# is part of the logic to re-align both belts spectrogram in order to combine them in the differential spectrogram.
def bilinear_interpolate(x, y, points, values):
x1, x2 = points[0]
y1, y2 = points[1]
f11, f12 = values[0]
f21, f22 = values[1]
interpolated_value = (
(f11 * (x2 - x) * (y2 - y) +
f21 * (x - x1) * (y2 - y) +
f12 * (x2 - x) * (y - y1) +
f22 * (x - x1) * (y - y1)) / ((x2 - x1) * (y2 - y1))
)
return interpolated_value
# Interpolate source_data (2D) to match target_x and target_y in order to interpolate and
# get similar time and frequency dimensions for the differential spectrogram # get similar time and frequency dimensions for the differential spectrogram
def interpolate_2d(target_x, target_y, source_x, source_y, source_data): def interpolate_2d(target_x, target_y, source_x, source_y, source_data):
interpolated_data = np.zeros((len(target_y), len(target_x))) # Create a grid of points in the source and target space
source_points = np.array([(x, y) for y in source_y for x in source_x])
for i, y in enumerate(target_y): target_points = np.array([(x, y) for y in target_y for x in target_x])
for j, x in enumerate(target_x):
# Find indices of surrounding points in source data
# and ensure we don't exceed array bounds
x_indices = np.searchsorted(source_x, x) - 1
y_indices = np.searchsorted(source_y, y) - 1
x_indices = max(0, min(len(source_x) - 1, x_indices))
y_indices = max(0, min(len(source_y) - 1, y_indices))
if x_indices == len(source_x) - 2: # Flatten the source data to match the flattened source points
x_indices -= 1 source_values = source_data.flatten()
if y_indices == len(source_y) - 2:
y_indices -= 1 # Interpolate and reshape the interpolated data to match the target grid shape and replace NaN with zeros
interpolated_data = scipy.interpolate.griddata(source_points, source_values, target_points, method='nearest')
x1, x2 = source_x[x_indices], source_x[x_indices + 1] interpolated_data = interpolated_data.reshape((len(target_y), len(target_x)))
y1, y2 = source_y[y_indices], source_y[y_indices + 1] interpolated_data = np.nan_to_num(interpolated_data)
f11 = source_data[y_indices, x_indices]
f12 = source_data[y_indices, x_indices + 1]
f21 = source_data[y_indices + 1, x_indices]
f22 = source_data[y_indices + 1, x_indices + 1]
interpolated_data[i, j] = bilinear_interpolate(x, y, ((x1, x2), (y1, y2)), ((f11, f12), (f21, f22)))
return interpolated_data return interpolated_data
@@ -242,21 +211,18 @@ def interpolate_2d(target_x, target_y, source_x, source_y, source_data):
def detect_ridge(pdata, n_average=3): def detect_ridge(pdata, n_average=3):
grad_y, grad_x = np.gradient(pdata) grad_y, grad_x = np.gradient(pdata)
magnitude = np.sqrt(grad_x**2 + grad_y**2) magnitude = np.sqrt(grad_x**2 + grad_y**2)
# Start at the maximum value in the first column # Start at the maximum value in the first column
start_idx = np.argmax(pdata[:, 0]) start_idx = np.argmax(pdata[:, 0])
path = [start_idx] path = [start_idx]
# Walk through the spectrogram following the path of the ridge
for j in range(1, pdata.shape[1]): for j in range(1, pdata.shape[1]):
# Look in the vicinity of the previous point start = max(0, path[-1]-n_average)
vicinity = magnitude[max(0, path[-1]-n_average):min(pdata.shape[0], path[-1]+n_average+1), j] end = min(pdata.shape[0], path[-1]+n_average+1)
# Take an average of top few points vicinity = magnitude[start:end, j]
sorted_indices = np.argsort(vicinity) next_idx = start + np.mean(np.argsort(vicinity)[-n_average:])
top_indices = sorted_indices[-n_average:] path.append(int(next_idx))
next_idx = int(np.mean(top_indices) + max(0, path[-1]-n_average))
path.append(next_idx)
return np.array(path) return np.array(path)
@@ -264,24 +230,21 @@ def detect_ridge(pdata, n_average=3):
# the frequency domain (using FFT). The result provides the lag (or offset) at which the two sequences are most similar. # the frequency domain (using FFT). The result provides the lag (or offset) at which the two sequences are most similar.
# This is used to re-align both belts spectrograms on their resonances lines in order to create the combined spectrogram. # This is used to re-align both belts spectrograms on their resonances lines in order to create the combined spectrogram.
def compute_cross_correlation_offset(ridge1, ridge2): def compute_cross_correlation_offset(ridge1, ridge2):
# Ensure that the two arrays have the same shape max_len = max(len(ridge1), len(ridge2))
if len(ridge1) < len(ridge2): ridge1 = np.pad(ridge1, (0, max_len - len(ridge1)), 'constant')
ridge1 = np.pad(ridge1, (0, len(ridge2) - len(ridge1))) ridge2 = np.pad(ridge2, (0, max_len - len(ridge2)), 'constant')
elif len(ridge1) > len(ridge2):
ridge2 = np.pad(ridge2, (0, len(ridge1) - len(ridge2)))
cross_corr = np.fft.fftshift(np.fft.ifft(np.fft.fft(ridge1) * np.conj(np.fft.fft(ridge2)))) cross_corr = np.fft.fftshift(np.fft.irfft(np.fft.rfft(ridge1) * np.conj(np.fft.rfft(ridge2))))
return np.argmax(np.abs(cross_corr)) - len(ridge1) // 2 return np.argmax(np.abs(cross_corr)) - max_len // 2
# This function shifts data along its second dimension - ie. time here - by a specified shift_amount # This function shifts data along its second dimension - ie. time here - by a specified shift_amount
def shift_data_in_time(data, shift_amount): def shift_data_in_time(data, shift_amount):
if shift_amount > 0: if shift_amount > 0:
return np.pad(data, ((0, 0), (shift_amount, 0)), mode='constant')[:, :-shift_amount] return np.concatenate([np.zeros((data.shape[0], shift_amount)), data[:, :-shift_amount]], axis=1)
elif shift_amount < 0: elif shift_amount < 0:
return np.pad(data, ((0, 0), (0, -shift_amount)), mode='constant')[:, -shift_amount:] return np.concatenate([data[:, -shift_amount:], np.zeros((data.shape[0], -shift_amount))], axis=1)
else: return data
return data
# Main logic function to combine two similar spectrogram - ie. from both belts paths - by detecting similarities (ridges), computing # Main logic function to combine two similar spectrogram - ie. from both belts paths - by detecting similarities (ridges), computing
@@ -289,7 +252,7 @@ def shift_data_in_time(data, shift_amount):
# This result of a mostly zero-ed new spectrogram with some colored zones highlighting differences in the belts paths. # This result of a mostly zero-ed new spectrogram with some colored zones highlighting differences in the belts paths.
def combined_spectrogram(data1, data2): def combined_spectrogram(data1, data2):
pdata1, bins1, t1 = compute_spectrogram(data1) pdata1, bins1, t1 = compute_spectrogram(data1)
pdata2, _, _ = compute_spectrogram(data2) pdata2, bins2, t2 = compute_spectrogram(data2)
# Detect ridges # Detect ridges
ridge1 = detect_ridge(pdata1) ridge1 = detect_ridge(pdata1)
@@ -298,7 +261,7 @@ def combined_spectrogram(data1, data2):
# Compute offset using cross-correlation and shit/align and interpolate the spectrograms # Compute offset using cross-correlation and shit/align and interpolate the spectrograms
offset = compute_cross_correlation_offset(ridge1, ridge2) offset = compute_cross_correlation_offset(ridge1, ridge2)
pdata2_aligned = shift_data_in_time(pdata2, offset) pdata2_aligned = shift_data_in_time(pdata2, offset)
pdata2_interpolated = interpolate_2d(t1, bins1, t1, bins1, pdata2_aligned) pdata2_interpolated = interpolate_2d(bins1, t1, bins2, t2, pdata2_aligned)
# Combine the spectrograms # Combine the spectrograms
combined_data = np.abs(pdata1 - pdata2_interpolated) combined_data = np.abs(pdata1 - pdata2_interpolated)
@@ -458,6 +421,7 @@ def plot_compare_frequency(ax, lognames, signal1, signal2, max_freq):
def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_factor, max_freq): def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_factor, max_freq):
combined_data, bins, t = combined_spectrogram(data1, data2) combined_data, bins, t = combined_spectrogram(data1, data2)
# combined_data, bins, t = compute_spectrogram(data1)
# Compute the MHI value from the differential spectrogram sum of gradient, salted with # Compute the MHI value from the differential spectrogram sum of gradient, salted with
# the similarity factor and the number or unpaired peaks from the belts frequency profile # the similarity factor and the number or unpaired peaks from the belts frequency profile
@@ -472,12 +436,12 @@ def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_f
n_bins = [0, 0.12, 0.9, 1] # These values where found experimentaly to get a good higlhlighting of the differences only n_bins = [0, 0.12, 0.9, 1] # These values where found experimentaly to get a good higlhlighting of the differences only
cm = matplotlib.colors.LinearSegmentedColormap.from_list('WhiteToRed', list(zip(n_bins, colors))) cm = matplotlib.colors.LinearSegmentedColormap.from_list('WhiteToRed', list(zip(n_bins, colors)))
norm = matplotlib.colors.Normalize(vmin=np.min(combined_data), vmax=np.max(combined_data)) norm = matplotlib.colors.Normalize(vmin=np.min(combined_data), vmax=np.max(combined_data))
ax.pcolormesh(bins, t, combined_data.T, cmap=cm, norm=norm, shading='gouraud') ax.pcolormesh(t, bins, combined_data.T, cmap=cm, norm=norm, shading='gouraud')
ax.set_xlabel('Frequency (hz)') ax.set_xlabel('Frequency (hz)')
ax.set_xlim([0., max_freq]) ax.set_xlim([0., max_freq])
ax.set_ylabel('Time (s)') ax.set_ylabel('Time (s)')
ax.set_ylim([0, t[-1]]) ax.set_ylim([0, bins[-1]])
fontP = matplotlib.font_manager.FontProperties() fontP = matplotlib.font_manager.FontProperties()
fontP.set_size('medium') fontP.set_size('medium')

View File

@@ -17,6 +17,7 @@
import optparse, matplotlib, sys, importlib, os, math import optparse, matplotlib, sys, importlib, os, math
from textwrap import wrap from textwrap import wrap
import numpy as np import numpy as np
import scipy
import matplotlib.pyplot, matplotlib.dates, matplotlib.font_manager import matplotlib.pyplot, matplotlib.dates, matplotlib.font_manager
import matplotlib.ticker, matplotlib.gridspec import matplotlib.ticker, matplotlib.gridspec
import locale import locale
@@ -99,20 +100,22 @@ def compute_damping_ratio(psd, freqs):
def compute_spectrogram(data): def compute_spectrogram(data):
N = data.shape[0] N = data.shape[0]
Fs = N / (data[-1,0] - data[0,0]) Fs = N / (data[-1, 0] - data[0, 0])
# Round up to a power of 2 for faster FFT # Round up to a power of 2 for faster FFT
M = 1 << int(.5 * Fs - 1).bit_length() M = 1 << int(.5 * Fs - 1).bit_length()
window = np.kaiser(M, 6.) window = np.kaiser(M, 6.)
def _specgram(x):
return matplotlib.mlab.specgram(
x, Fs=Fs, NFFT=M, noverlap=M//2, window=window,
mode='psd', detrend='mean', scale_by_freq=False)
d = {'x': data[:,1], 'y': data[:,2], 'z': data[:,3]} def _specgram(x):
pdata, bins, t = _specgram(d['x']) x_detrended = x - np.mean(x) # Detrending by subtracting the mean value
for ax in 'yz': return scipy.signal.spectrogram(
pdata += _specgram(d[ax])[0] x_detrended, fs=Fs, window=window, nperseg=M, noverlap=M//2,
return pdata, bins, t detrend='constant', scaling='density', mode='psd')
d = {'x': data[:, 1], 'y': data[:, 2], 'z': data[:, 3]}
f, t, pdata = _specgram(d['x'])
for axis in 'yz':
pdata += _specgram(d[axis])[2]
return pdata, t, f
# This find all the peaks in a curve by looking at when the derivative term goes from positive to negative # This find all the peaks in a curve by looking at when the derivative term goes from positive to negative
@@ -270,7 +273,7 @@ def plot_spectrogram(ax, data, peaks, max_freq):
vmin_value = np.percentile(pdata, SPECTROGRAM_LOW_PERCENTILE_FILTER) vmin_value = np.percentile(pdata, SPECTROGRAM_LOW_PERCENTILE_FILTER)
ax.set_title("Time-Frequency Spectrogram", fontsize=14, color=KLIPPAIN_COLORS['dark_orange'], weight='bold') ax.set_title("Time-Frequency Spectrogram", fontsize=14, color=KLIPPAIN_COLORS['dark_orange'], weight='bold')
ax.pcolormesh(bins, t, pdata.T, norm=matplotlib.colors.LogNorm(vmin=vmin_value), ax.pcolormesh(t, bins, pdata.T, norm=matplotlib.colors.LogNorm(vmin=vmin_value),
cmap='inferno', shading='gouraud') cmap='inferno', shading='gouraud')
# Add peaks lines in the spectrogram to get hint from peaks found in the first graph # Add peaks lines in the spectrogram to get hint from peaks found in the first graph