belt differential spectrogram now show the impact of each belt with its own color
This commit is contained in:
@@ -205,67 +205,23 @@ def interpolate_2d(target_x, target_y, source_x, source_y, source_data):
|
|||||||
return interpolated_data
|
return interpolated_data
|
||||||
|
|
||||||
|
|
||||||
# This function identifies a "ridge" of high gradient magnitude in a spectrogram (pdata) - ie. a resonance diagonal line. Starting from
|
# Main logic function to combine two similar spectrogram - ie. from both belts paths - by substracting signals in order to create
|
||||||
# the maximum value in the first column, it iteratively follows the direction of the highest gradient in the vicinity (window configured using
|
# a new composite spectrogram. This result of a divergent but mostly centered new spectrogram (center will be white) with some colored zones
|
||||||
# the n_average parameter). The result is a sequence of indices that traces the resonance line across the original spectrogram.
|
# highlighting differences in the belts paths. The summative spectrogram is used for the MHI calculation.
|
||||||
def detect_ridge(pdata, n_average=3):
|
|
||||||
grad_y, grad_x = np.gradient(pdata)
|
|
||||||
magnitude = np.sqrt(grad_x**2 + grad_y**2)
|
|
||||||
|
|
||||||
# Start at the maximum value in the first column
|
|
||||||
start_idx = np.argmax(pdata[:, 0])
|
|
||||||
path = [start_idx]
|
|
||||||
|
|
||||||
for j in range(1, pdata.shape[1]):
|
|
||||||
start = max(0, path[-1]-n_average)
|
|
||||||
end = min(pdata.shape[0], path[-1]+n_average+1)
|
|
||||||
vicinity = magnitude[start:end, j]
|
|
||||||
next_idx = start + np.mean(np.argsort(vicinity)[-n_average:])
|
|
||||||
path.append(int(next_idx))
|
|
||||||
|
|
||||||
return np.array(path)
|
|
||||||
|
|
||||||
|
|
||||||
# This function calculates the time offset between two resonances lines (ridge1 and ridge2) using cross-correlation in
|
|
||||||
# the frequency domain (using FFT). The result provides the lag (or offset) at which the two sequences are most similar.
|
|
||||||
# This is used to re-align both belts spectrograms on their resonances lines in order to create the combined spectrogram.
|
|
||||||
def compute_cross_correlation_offset(ridge1, ridge2):
|
|
||||||
max_len = max(len(ridge1), len(ridge2))
|
|
||||||
ridge1 = np.pad(ridge1, (0, max_len - len(ridge1)), 'constant')
|
|
||||||
ridge2 = np.pad(ridge2, (0, max_len - len(ridge2)), 'constant')
|
|
||||||
|
|
||||||
cross_corr = np.fft.fftshift(np.fft.irfft(np.fft.rfft(ridge1) * np.conj(np.fft.rfft(ridge2))))
|
|
||||||
return np.argmax(np.abs(cross_corr)) - max_len // 2
|
|
||||||
|
|
||||||
|
|
||||||
# This function shifts data along its second dimension - ie. time here - by a specified shift_amount
|
|
||||||
def shift_data_in_time(data, shift_amount):
|
|
||||||
if shift_amount > 0:
|
|
||||||
return np.concatenate([np.zeros((data.shape[0], shift_amount)), data[:, :-shift_amount]], axis=1)
|
|
||||||
elif shift_amount < 0:
|
|
||||||
return np.concatenate([data[:, -shift_amount:], np.zeros((data.shape[0], -shift_amount))], axis=1)
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
# Main logic function to combine two similar spectrogram - ie. from both belts paths - by detecting similarities (ridges), computing
|
|
||||||
# the time lag and realigning them. Finally this function combine (by substracting signals) the aligned spectrograms in a new one.
|
|
||||||
# This result of a mostly zero-ed new spectrogram with some colored zones highlighting differences in the belts paths.
|
|
||||||
def combined_spectrogram(data1, data2):
|
def combined_spectrogram(data1, data2):
|
||||||
pdata1, bins1, t1 = compute_spectrogram(data1)
|
pdata1, bins1, t1 = compute_spectrogram(data1)
|
||||||
pdata2, bins2, t2 = compute_spectrogram(data2)
|
pdata2, bins2, t2 = compute_spectrogram(data2)
|
||||||
|
|
||||||
# Detect ridges
|
# Interpolate the spectrograms and apply logarithmic scaling
|
||||||
ridge1 = detect_ridge(pdata1)
|
pdata2_interpolated = interpolate_2d(bins1, t1, bins2, t2, pdata2)
|
||||||
ridge2 = detect_ridge(pdata2)
|
pdata1_log = np.log1p(pdata1 / np.max(pdata1))
|
||||||
|
pdata2_log = np.log1p(pdata2_interpolated / np.max(pdata2_interpolated))
|
||||||
|
|
||||||
# Compute offset using cross-correlation and shit/align and interpolate the spectrograms
|
# Combined spectrograms
|
||||||
offset = compute_cross_correlation_offset(ridge1, ridge2)
|
combined_sum = np.abs(pdata1 - pdata2_interpolated)
|
||||||
pdata2_aligned = shift_data_in_time(pdata2, offset)
|
combined_divergent = pdata1_log - pdata2_log
|
||||||
pdata2_interpolated = interpolate_2d(bins1, t1, bins2, t2, pdata2_aligned)
|
|
||||||
|
|
||||||
# Combine the spectrograms
|
return combined_sum, combined_divergent, bins1, t1
|
||||||
combined_data = np.abs(pdata1 - pdata2_interpolated)
|
|
||||||
return combined_data, bins1, t1
|
|
||||||
|
|
||||||
|
|
||||||
# Compute a composite and highly subjective value indicating the "mechanical health of the printer (0 to 100%)" that represent the
|
# Compute a composite and highly subjective value indicating the "mechanical health of the printer (0 to 100%)" that represent the
|
||||||
@@ -274,7 +230,8 @@ def combined_spectrogram(data1, data2):
|
|||||||
# This result in a percentage value quantifying the machine behavior around the main resonances that give an hint if only touching belt tension
|
# This result in a percentage value quantifying the machine behavior around the main resonances that give an hint if only touching belt tension
|
||||||
# will give good graphs or if there is a chance of mechanical issues in the background (above 50% should be considered as probably problematic)
|
# will give good graphs or if there is a chance of mechanical issues in the background (above 50% should be considered as probably problematic)
|
||||||
def compute_mhi(combined_data, similarity_coefficient, num_unpaired_peaks):
|
def compute_mhi(combined_data, similarity_coefficient, num_unpaired_peaks):
|
||||||
filtered_data = combined_data[combined_data > 100]
|
# filtered_data = combined_data[combined_data > 100]
|
||||||
|
filtered_data = np.abs(combined_data)
|
||||||
|
|
||||||
# First compute a "total variability metric" based on the sum of the gradient that sum the magnitude of will emphasize regions of the
|
# First compute a "total variability metric" based on the sum of the gradient that sum the magnitude of will emphasize regions of the
|
||||||
# spectrogram where there are rapid changes in magnitude (like the edges of resonance peaks).
|
# spectrogram where there are rapid changes in magnitude (like the edges of resonance peaks).
|
||||||
@@ -420,23 +377,23 @@ def plot_compare_frequency(ax, lognames, signal1, signal2, max_freq):
|
|||||||
|
|
||||||
|
|
||||||
def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_factor, max_freq):
|
def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_factor, max_freq):
|
||||||
combined_data, bins, t = combined_spectrogram(data1, data2)
|
combined_sum, combined_divergent, bins, t = combined_spectrogram(data1, data2)
|
||||||
# combined_data, bins, t = compute_spectrogram(data1)
|
|
||||||
|
|
||||||
# Compute the MHI value from the differential spectrogram sum of gradient, salted with
|
# Compute the MHI value from the differential spectrogram sum of gradient, salted with
|
||||||
# the similarity factor and the number or unpaired peaks from the belts frequency profile
|
# the similarity factor and the number or unpaired peaks from the belts frequency profile
|
||||||
# Be careful, this value is highly opinionated and is pretty experimental!
|
# Be careful, this value is highly opinionated and is pretty experimental!
|
||||||
mhi, textual_mhi = compute_mhi(combined_data, similarity_factor, len(signal1.unpaired_peaks) + len(signal2.unpaired_peaks))
|
mhi, textual_mhi = compute_mhi(combined_sum, similarity_factor, len(signal1.unpaired_peaks) + len(signal2.unpaired_peaks))
|
||||||
print(f"[experimental] Mechanical Health Indicator: {textual_mhi.lower()} ({mhi:.1f}%)")
|
print(f"[experimental] Mechanical Health Indicator: {textual_mhi.lower()} ({mhi:.1f}%)")
|
||||||
ax.set_title(f"Differential Spectrogram", fontsize=14, color=KLIPPAIN_COLORS['dark_orange'], weight='bold')
|
ax.set_title(f"Differential Spectrogram", fontsize=14, color=KLIPPAIN_COLORS['dark_orange'], weight='bold')
|
||||||
ax.plot([], [], ' ', label=f'{textual_mhi} (experimental)')
|
ax.plot([], [], ' ', label=f'{textual_mhi} (experimental)')
|
||||||
|
|
||||||
# Draw the differential spectrogram with a specific custom norm to get white or light orange zero values and red for max values
|
# Draw the differential spectrogram with a specific custom norm to get orange or purple values where there is signal or white near zeros
|
||||||
colors = ['white', 'bisque', 'red', 'black']
|
colors = [KLIPPAIN_COLORS['orange'], 'white', 'white', KLIPPAIN_COLORS['purple']]
|
||||||
n_bins = [0, 0.12, 0.9, 1] # These values where found experimentaly to get a good higlhlighting of the differences only
|
n_bins = [0, 0.3, 0.7, 1] # These values where found experimentaly to get a good higlhlighting of the differences only
|
||||||
cm = matplotlib.colors.LinearSegmentedColormap.from_list('WhiteToRed', list(zip(n_bins, colors)))
|
cm = matplotlib.colors.LinearSegmentedColormap.from_list('klippain_divergent', list(zip(n_bins, colors)))
|
||||||
norm = matplotlib.colors.Normalize(vmin=np.min(combined_data), vmax=np.max(combined_data))
|
# norm = matplotlib.colors.Normalize(vmin=np.min(combined_divergent), vmax=np.max(combined_divergent))
|
||||||
ax.pcolormesh(t, bins, combined_data.T, cmap=cm, norm=norm, shading='gouraud')
|
norm = matplotlib.colors.SymLogNorm(linthresh=0.01, vmin=np.min(combined_divergent), vmax=np.max(combined_divergent), base=10, clip=False)
|
||||||
|
ax.pcolormesh(t, bins, combined_divergent.T, cmap=cm, norm=norm, shading='gouraud')
|
||||||
|
|
||||||
ax.set_xlabel('Frequency (hz)')
|
ax.set_xlabel('Frequency (hz)')
|
||||||
ax.set_xlim([0., max_freq])
|
ax.set_xlim([0., max_freq])
|
||||||
@@ -450,16 +407,16 @@ def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_f
|
|||||||
# Plot vertical lines for unpaired peaks
|
# Plot vertical lines for unpaired peaks
|
||||||
unpaired_peak_count = 0
|
unpaired_peak_count = 0
|
||||||
for _, peak in enumerate(signal1.unpaired_peaks):
|
for _, peak in enumerate(signal1.unpaired_peaks):
|
||||||
ax.axvline(signal1.freqs[peak], color='red', linestyle='dotted', linewidth=1.5)
|
ax.axvline(signal1.freqs[peak], color=KLIPPAIN_COLORS['red_pink'], linestyle='dotted', linewidth=1.5)
|
||||||
ax.annotate(f"Peak {unpaired_peak_count + 1}", (signal1.freqs[peak], t[-1]*0.05),
|
ax.annotate(f"Peak {unpaired_peak_count + 1}", (signal1.freqs[peak], t[-1]*0.05),
|
||||||
textcoords="data", color='red', rotation=90, fontsize=10,
|
textcoords="data", color=KLIPPAIN_COLORS['red_pink'], rotation=90, fontsize=10,
|
||||||
verticalalignment='bottom', horizontalalignment='right')
|
verticalalignment='bottom', horizontalalignment='right')
|
||||||
unpaired_peak_count +=1
|
unpaired_peak_count +=1
|
||||||
|
|
||||||
for _, peak in enumerate(signal2.unpaired_peaks):
|
for _, peak in enumerate(signal2.unpaired_peaks):
|
||||||
ax.axvline(signal2.freqs[peak], color='red', linestyle='dotted', linewidth=1.5)
|
ax.axvline(signal2.freqs[peak], color=KLIPPAIN_COLORS['red_pink'], linestyle='dotted', linewidth=1.5)
|
||||||
ax.annotate(f"Peak {unpaired_peak_count + 1}", (signal2.freqs[peak], t[-1]*0.05),
|
ax.annotate(f"Peak {unpaired_peak_count + 1}", (signal2.freqs[peak], t[-1]*0.05),
|
||||||
textcoords="data", color='red', rotation=90, fontsize=10,
|
textcoords="data", color=KLIPPAIN_COLORS['red_pink'], rotation=90, fontsize=10,
|
||||||
verticalalignment='bottom', horizontalalignment='right')
|
verticalalignment='bottom', horizontalalignment='right')
|
||||||
unpaired_peak_count +=1
|
unpaired_peak_count +=1
|
||||||
|
|
||||||
@@ -468,11 +425,11 @@ def plot_difference_spectrogram(ax, data1, data2, signal1, signal2, similarity_f
|
|||||||
label = ALPHABET[idx]
|
label = ALPHABET[idx]
|
||||||
x_min = min(peak1[1], peak2[1])
|
x_min = min(peak1[1], peak2[1])
|
||||||
x_max = max(peak1[1], peak2[1])
|
x_max = max(peak1[1], peak2[1])
|
||||||
ax.axvline(x_min, color=KLIPPAIN_COLORS['purple'], linestyle='dotted', linewidth=1.5)
|
ax.axvline(x_min, color=KLIPPAIN_COLORS['dark_purple'], linestyle='dotted', linewidth=1.5)
|
||||||
ax.axvline(x_max, color=KLIPPAIN_COLORS['purple'], linestyle='dotted', linewidth=1.5)
|
ax.axvline(x_max, color=KLIPPAIN_COLORS['dark_purple'], linestyle='dotted', linewidth=1.5)
|
||||||
ax.fill_between([x_min, x_max], 0, np.max(combined_data), color=KLIPPAIN_COLORS['purple'], alpha=0.3)
|
ax.fill_between([x_min, x_max], 0, np.max(combined_divergent), color=KLIPPAIN_COLORS['dark_purple'], alpha=0.3)
|
||||||
ax.annotate(f"Peaks {label}", (x_min, t[-1]*0.05),
|
ax.annotate(f"Peaks {label}", (x_min, t[-1]*0.05),
|
||||||
textcoords="data", color=KLIPPAIN_COLORS['purple'], rotation=90, fontsize=10,
|
textcoords="data", color=KLIPPAIN_COLORS['dark_purple'], rotation=90, fontsize=10,
|
||||||
verticalalignment='bottom', horizontalalignment='right')
|
verticalalignment='bottom', horizontalalignment='right')
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user