Run ShakeTune as an in-process Klipper module (#100)
* feat: Run ShakeTune as an in-process Klipper module * feat: install shaketune dependencies to klipper venv * refactor: replace print_with_c_locale with klipper console output with stdout fallback
This commit is contained in:
0
shaketune/helpers/__init__.py
Normal file
0
shaketune/helpers/__init__.py
Normal file
257
shaketune/helpers/common_func.py
Normal file
257
shaketune/helpers/common_func.py
Normal file
@@ -0,0 +1,257 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Common functions for the Shake&Tune package
|
||||
# Written by Frix_x#0161 #
|
||||
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
from importlib import import_module
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from scipy.signal import spectrogram
|
||||
from .console_output import ConsoleOutput
|
||||
|
||||
|
||||
def parse_log(logname):
|
||||
try:
|
||||
with open(logname) as f:
|
||||
header = None
|
||||
for line in f:
|
||||
cleaned_line = line.strip()
|
||||
|
||||
# Check for a PSD file generated by Klipper and raise a warning
|
||||
if cleaned_line.startswith('#freq,psd_x,psd_y,psd_z,psd_xyz'):
|
||||
ConsoleOutput.print(
|
||||
'Warning: %s does not contain raw accelerometer data. '
|
||||
'Please use the official Klipper script to process it instead. '
|
||||
'It will be ignored by Shake&Tune!' % (logname,)
|
||||
)
|
||||
return None
|
||||
|
||||
# Check for the expected header for Shake&Tune (raw accelerometer data from Klipper)
|
||||
elif cleaned_line.startswith('#time,accel_x,accel_y,accel_z'):
|
||||
header = cleaned_line
|
||||
break
|
||||
|
||||
if not header:
|
||||
ConsoleOutput.print(
|
||||
'Warning: file %s has an incorrect header and will be ignored by Shake&Tune!\n'
|
||||
"Expected '#time,accel_x,accel_y,accel_z', but got '%s'." % (logname, header.strip())
|
||||
)
|
||||
return None
|
||||
|
||||
# If we have the correct raw data header, proceed to load the data
|
||||
data = np.loadtxt(logname, comments='#', delimiter=',', skiprows=1)
|
||||
if data.ndim == 1 or data.shape[1] != 4:
|
||||
ConsoleOutput.print(
|
||||
'Warning: %s does not have the correct data format; expected 4 columns. '
|
||||
'It will be ignored by Shake&Tune!' % (logname,)
|
||||
)
|
||||
return None
|
||||
|
||||
return data
|
||||
|
||||
except Exception as err:
|
||||
ConsoleOutput.print(f'Error while reading {logname}: {err}. It will be ignored by Shake&Tune!')
|
||||
return None
|
||||
|
||||
|
||||
def setup_klipper_import(kdir):
|
||||
kdir = os.path.expanduser(kdir)
|
||||
sys.path.append(os.path.join(kdir, 'klippy'))
|
||||
return import_module('.shaper_calibrate', 'extras')
|
||||
|
||||
|
||||
# This is used to print the current S&T version on top of the png graph file
|
||||
def get_git_version():
|
||||
try:
|
||||
# Get the absolute path of the script, resolving any symlinks
|
||||
# Then get 2 times to parent dir to be at the git root folder
|
||||
from git import GitCommandError, Repo
|
||||
script_path = Path(__file__).resolve()
|
||||
repo_path = script_path.parents[1]
|
||||
repo = Repo(repo_path)
|
||||
|
||||
try:
|
||||
version = repo.git.describe('--tags')
|
||||
except GitCommandError:
|
||||
# If no tag is found, use the simplified commit SHA instead
|
||||
version = repo.head.commit.hexsha[:7]
|
||||
return version
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# This is Klipper's spectrogram generation function adapted to use Scipy
|
||||
def compute_spectrogram(data):
|
||||
N = data.shape[0]
|
||||
Fs = N / (data[-1, 0] - data[0, 0])
|
||||
# Round up to a power of 2 for faster FFT
|
||||
M = 1 << int(0.5 * Fs - 1).bit_length()
|
||||
window = np.kaiser(M, 6.0)
|
||||
|
||||
def _specgram(x):
|
||||
return spectrogram(
|
||||
x, fs=Fs, window=window, nperseg=M, noverlap=M // 2, detrend='constant', scaling='density', mode='psd'
|
||||
)
|
||||
|
||||
d = {'x': data[:, 1], 'y': data[:, 2], 'z': data[:, 3]}
|
||||
f, t, pdata = _specgram(d['x'])
|
||||
for axis in 'yz':
|
||||
pdata += _specgram(d[axis])[2]
|
||||
return pdata, t, f
|
||||
|
||||
|
||||
# Compute natural resonant frequency and damping ratio by using the half power bandwidth method with interpolated frequencies
|
||||
def compute_mechanical_parameters(psd, freqs, min_freq=None):
|
||||
max_under_min_freq = False
|
||||
|
||||
if min_freq is not None:
|
||||
min_freq_index = np.searchsorted(freqs, min_freq, side='left')
|
||||
if min_freq_index >= len(freqs):
|
||||
return None, None, None, max_under_min_freq
|
||||
if np.argmax(psd) < min_freq_index:
|
||||
max_under_min_freq = True
|
||||
else:
|
||||
min_freq_index = 0
|
||||
|
||||
# Consider only the part of the signal above min_freq
|
||||
psd_above_min_freq = psd[min_freq_index:]
|
||||
if len(psd_above_min_freq) == 0:
|
||||
return None, None, None, max_under_min_freq
|
||||
|
||||
max_power_index_above_min_freq = np.argmax(psd_above_min_freq)
|
||||
max_power_index = max_power_index_above_min_freq + min_freq_index
|
||||
fr = freqs[max_power_index]
|
||||
max_power = psd[max_power_index]
|
||||
|
||||
half_power = max_power / math.sqrt(2)
|
||||
indices_below = np.where(psd[:max_power_index] <= half_power)[0]
|
||||
indices_above = np.where(psd[max_power_index:] <= half_power)[0]
|
||||
|
||||
# If we are not able to find points around the half power, we can't compute the damping ratio and return None instead
|
||||
if len(indices_below) == 0 or len(indices_above) == 0:
|
||||
return fr, None, max_power_index, max_under_min_freq
|
||||
|
||||
idx_below = indices_below[-1]
|
||||
idx_above = indices_above[0] + max_power_index
|
||||
freq_below_half_power = freqs[idx_below] + (half_power - psd[idx_below]) * (
|
||||
freqs[idx_below + 1] - freqs[idx_below]
|
||||
) / (psd[idx_below + 1] - psd[idx_below])
|
||||
freq_above_half_power = freqs[idx_above - 1] + (half_power - psd[idx_above - 1]) * (
|
||||
freqs[idx_above] - freqs[idx_above - 1]
|
||||
) / (psd[idx_above] - psd[idx_above - 1])
|
||||
|
||||
bandwidth = freq_above_half_power - freq_below_half_power
|
||||
bw1 = math.pow(bandwidth / fr, 2)
|
||||
bw2 = math.pow(bandwidth / fr, 4)
|
||||
|
||||
try:
|
||||
zeta = math.sqrt(0.5 - math.sqrt(1 / (4 + 4 * bw1 - bw2)))
|
||||
except ValueError:
|
||||
# If a math problem arise such as a negative sqrt term, we also return None instead for damping ratio
|
||||
return fr, None, max_power_index, max_under_min_freq
|
||||
|
||||
return fr, zeta, max_power_index, max_under_min_freq
|
||||
|
||||
|
||||
# This find all the peaks in a curve by looking at when the derivative term goes from positive to negative
|
||||
# Then only the peaks found above a threshold are kept to avoid capturing peaks in the low amplitude noise of a signal
|
||||
def detect_peaks(data, indices, detection_threshold, relative_height_threshold=None, window_size=5, vicinity=3):
|
||||
# Smooth the curve using a moving average to avoid catching peaks everywhere in noisy signals
|
||||
kernel = np.ones(window_size) / window_size
|
||||
smoothed_data = np.convolve(data, kernel, mode='valid')
|
||||
mean_pad = [np.mean(data[:window_size])] * (window_size // 2)
|
||||
smoothed_data = np.concatenate((mean_pad, smoothed_data))
|
||||
|
||||
# Find peaks on the smoothed curve
|
||||
smoothed_peaks = (
|
||||
np.where((smoothed_data[:-2] < smoothed_data[1:-1]) & (smoothed_data[1:-1] > smoothed_data[2:]))[0] + 1
|
||||
)
|
||||
smoothed_peaks = smoothed_peaks[smoothed_data[smoothed_peaks] > detection_threshold]
|
||||
|
||||
# Additional validation for peaks based on relative height
|
||||
valid_peaks = smoothed_peaks
|
||||
if relative_height_threshold is not None:
|
||||
valid_peaks = []
|
||||
for peak in smoothed_peaks:
|
||||
peak_height = smoothed_data[peak] - np.min(
|
||||
smoothed_data[max(0, peak - vicinity) : min(len(smoothed_data), peak + vicinity + 1)]
|
||||
)
|
||||
if peak_height > relative_height_threshold * smoothed_data[peak]:
|
||||
valid_peaks.append(peak)
|
||||
|
||||
# Refine peak positions on the original curve
|
||||
refined_peaks = []
|
||||
for peak in valid_peaks:
|
||||
local_max = peak + np.argmax(data[max(0, peak - vicinity) : min(len(data), peak + vicinity + 1)]) - vicinity
|
||||
refined_peaks.append(local_max)
|
||||
|
||||
num_peaks = len(refined_peaks)
|
||||
|
||||
return num_peaks, np.array(refined_peaks), indices[refined_peaks]
|
||||
|
||||
|
||||
# The goal is to find zone outside of peaks (flat low energy zones) in a signal
|
||||
def identify_low_energy_zones(power_total, detection_threshold=0.1):
|
||||
valleys = []
|
||||
|
||||
# Calculate the a "mean + 1/4" and standard deviation of the entire power_total
|
||||
mean_energy = np.mean(power_total) + (np.max(power_total) - np.min(power_total)) / 4
|
||||
std_energy = np.std(power_total)
|
||||
|
||||
# Define a threshold value as "mean + 1/4" minus a certain number of standard deviations
|
||||
threshold_value = mean_energy - detection_threshold * std_energy
|
||||
|
||||
# Find valleys in power_total based on the threshold
|
||||
in_valley = False
|
||||
start_idx = 0
|
||||
for i, value in enumerate(power_total):
|
||||
if not in_valley and value < threshold_value:
|
||||
in_valley = True
|
||||
start_idx = i
|
||||
elif in_valley and value >= threshold_value:
|
||||
in_valley = False
|
||||
valleys.append((start_idx, i))
|
||||
|
||||
# If the last point is still in a valley, close the valley
|
||||
if in_valley:
|
||||
valleys.append((start_idx, len(power_total) - 1))
|
||||
|
||||
max_signal = np.max(power_total)
|
||||
|
||||
# Calculate mean energy for each valley as a percentage of the maximum of the signal
|
||||
valley_means_percentage = []
|
||||
for start, end in valleys:
|
||||
if not np.isnan(np.mean(power_total[start:end])):
|
||||
valley_means_percentage.append((start, end, (np.mean(power_total[start:end]) / max_signal) * 100))
|
||||
|
||||
# Sort valleys based on mean percentage values
|
||||
sorted_valleys = sorted(valley_means_percentage, key=lambda x: x[2])
|
||||
|
||||
return sorted_valleys
|
||||
|
||||
|
||||
# Calculate or estimate a "similarity" factor between two PSD curves and scale it to a percentage. This is
|
||||
# used here to quantify how close the two belts path behavior and responses are close together.
|
||||
def compute_curve_similarity_factor(x1, y1, x2, y2, sim_sigmoid_k=0.6):
|
||||
# Interpolate PSDs to match the same frequency bins and do a cross-correlation
|
||||
y2_interp = np.interp(x1, x2, y2)
|
||||
cross_corr = np.correlate(y1, y2_interp, mode='full')
|
||||
|
||||
# Find the peak of the cross-correlation and compute a similarity normalized by the energy of the signals
|
||||
peak_value = np.max(cross_corr)
|
||||
similarity = peak_value / (np.sqrt(np.sum(y1**2) * np.sum(y2_interp**2)))
|
||||
|
||||
# Apply sigmoid scaling to get better numbers and get a final percentage value
|
||||
scaled_similarity = sigmoid_scale(-np.log(1 - similarity), sim_sigmoid_k)
|
||||
|
||||
return scaled_similarity
|
||||
|
||||
|
||||
# Simple helper to compute a sigmoid scalling (from 0 to 100%)
|
||||
def sigmoid_scale(x, k=1):
|
||||
return 1 / (1 + np.exp(-k * x)) * 100
|
||||
24
shaketune/helpers/console_output.py
Normal file
24
shaketune/helpers/console_output.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import io
|
||||
from typing import Callable, Optional
|
||||
|
||||
|
||||
class ConsoleOutput:
|
||||
"""
|
||||
Print output to stdout or to an alternative like the Klipper console through a callback
|
||||
"""
|
||||
|
||||
_output_func: Optional[Callable[[str], None]] = None
|
||||
|
||||
@classmethod
|
||||
def register_output_callback(cls, output_func: Optional[Callable[[str], None]]):
|
||||
cls._output_func = output_func
|
||||
|
||||
@classmethod
|
||||
def print(cls, *args, **kwargs):
|
||||
if not cls._output_func:
|
||||
print(*args, **kwargs)
|
||||
return
|
||||
|
||||
with io.StringIO() as mem_output:
|
||||
print(*args, file=mem_output, **kwargs)
|
||||
cls._output_func(mem_output.getvalue())
|
||||
38
shaketune/helpers/filemanager.py
Normal file
38
shaketune/helpers/filemanager.py
Normal file
@@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Common file management functions for the Shake&Tune package
|
||||
# Written by Frix_x#0161 #
|
||||
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def wait_file_ready(filepath: Path, timeout: int = 60) -> None:
|
||||
file_busy = True
|
||||
loop_count = 0
|
||||
|
||||
while file_busy:
|
||||
if loop_count >= timeout:
|
||||
raise TimeoutError(f'Klipper is taking too long to release the CSV file ({filepath})!')
|
||||
|
||||
# Try to open the file in write-only mode to check if it is in use
|
||||
# If we successfully open and close the file, it is not in use
|
||||
try:
|
||||
fd = os.open(filepath, os.O_WRONLY)
|
||||
os.close(fd)
|
||||
file_busy = False
|
||||
except OSError:
|
||||
# If OSError is caught, it indicates the file is still being used
|
||||
pass
|
||||
except Exception:
|
||||
# If another exception is raised, it's not a problem, we just loop again
|
||||
pass
|
||||
|
||||
loop_count += 1
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def ensure_folders_exist(folders: list[Path]) -> None:
|
||||
for folder in folders:
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
205
shaketune/helpers/motorlogparser.py
Normal file
205
shaketune/helpers/motorlogparser.py
Normal file
@@ -0,0 +1,205 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Classes to parse the Klipper log and parse the TMC dump to extract the relevant information
|
||||
# Written by Frix_x#0161 #
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
|
||||
class Motor:
|
||||
def __init__(self, name: str):
|
||||
self._name: str = name
|
||||
self._registers: Dict[str, Dict[str, Any]] = {}
|
||||
self._properties: Dict[str, Any] = {}
|
||||
|
||||
def set_register(self, register: str, value: Any) -> None:
|
||||
# Special parsing for CHOPCONF to extract meaningful values
|
||||
if register == 'CHOPCONF':
|
||||
# Add intpol=0 if missing from the register dump
|
||||
if 'intpol=' not in value:
|
||||
value += ' intpol=0'
|
||||
# Simplify the microstep resolution format
|
||||
mres_match = re.search(r'mres=\d+\((\d+)usteps\)', value)
|
||||
if mres_match:
|
||||
value = re.sub(r'mres=\d+\(\d+usteps\)', f'mres={mres_match.group(1)}', value)
|
||||
|
||||
# Special parsing for CHOPCONF to avoid pwm_ before each values
|
||||
if register == 'PWMCONF':
|
||||
parts = value.split()
|
||||
new_parts = []
|
||||
for part in parts:
|
||||
key, val = part.split('=', 1)
|
||||
if key.startswith('pwm_'):
|
||||
key = key[4:]
|
||||
new_parts.append(f'{key}={val}')
|
||||
value = ' '.join(new_parts)
|
||||
|
||||
# General cleaning to remove extraneous labels and colons and parse the whole into Motor _registers
|
||||
cleaned_values = re.sub(r'\b\w+:\s+\S+\s+', '', value)
|
||||
|
||||
# Then fill the registers while merging all the thresholds into the same THRS virtual register
|
||||
if register in ['TPWMTHRS', 'TCOOLTHRS']:
|
||||
existing_thrs = self._registers.get('THRS', {})
|
||||
new_values = self._parse_register_values(cleaned_values)
|
||||
merged_values = {**existing_thrs, **new_values}
|
||||
self._registers['THRS'] = merged_values
|
||||
else:
|
||||
self._registers[register] = self._parse_register_values(cleaned_values)
|
||||
|
||||
def _parse_register_values(self, register_string: str) -> Dict[str, Any]:
|
||||
parsed = {}
|
||||
parts = register_string.split()
|
||||
for part in parts:
|
||||
if '=' in part:
|
||||
k, v = part.split('=', 1)
|
||||
parsed[k] = v
|
||||
return parsed
|
||||
|
||||
def get_register(self, register: str) -> Optional[Dict[str, Any]]:
|
||||
return self._registers.get(register)
|
||||
|
||||
def get_registers(self) -> Dict[str, Dict[str, Any]]:
|
||||
return self._registers
|
||||
|
||||
def set_property(self, property: str, value: Any) -> None:
|
||||
self._properties[property] = value
|
||||
|
||||
def get_property(self, property: str) -> Optional[Any]:
|
||||
return self._properties.get(property)
|
||||
|
||||
def __str__(self):
|
||||
return f'Stepper: {self._name}\nKlipper config: {self._properties}\nTMC Registers: {self._registers}'
|
||||
|
||||
# Return the other motor properties and registers that are different from the current motor
|
||||
def compare_to(self, other: 'Motor') -> Optional[Dict[str, Dict[str, Any]]]:
|
||||
differences = {'properties': {}, 'registers': {}}
|
||||
|
||||
# Compare properties
|
||||
all_keys = self._properties.keys() | other._properties.keys()
|
||||
for key in all_keys:
|
||||
val1 = self._properties.get(key)
|
||||
val2 = other._properties.get(key)
|
||||
if val1 != val2:
|
||||
differences['properties'][key] = val2
|
||||
|
||||
# Compare registers
|
||||
all_keys = self._registers.keys() | other._registers.keys()
|
||||
for key in all_keys:
|
||||
reg1 = self._registers.get(key, {})
|
||||
reg2 = other._registers.get(key, {})
|
||||
if reg1 != reg2:
|
||||
reg_diffs = {}
|
||||
sub_keys = reg1.keys() | reg2.keys()
|
||||
for sub_key in sub_keys:
|
||||
reg_val1 = reg1.get(sub_key)
|
||||
reg_val2 = reg2.get(sub_key)
|
||||
if reg_val1 != reg_val2:
|
||||
reg_diffs[sub_key] = reg_val2
|
||||
if reg_diffs:
|
||||
differences['registers'][key] = reg_diffs
|
||||
|
||||
# Clean up: remove empty sections if there are no differences
|
||||
if not differences['properties']:
|
||||
del differences['properties']
|
||||
if not differences['registers']:
|
||||
del differences['registers']
|
||||
|
||||
if not differences:
|
||||
return None
|
||||
|
||||
return differences
|
||||
|
||||
|
||||
class MotorLogParser:
|
||||
_section_pattern: str = r'DUMP_TMC stepper_(x|y)'
|
||||
_register_patterns: Dict[str, str] = {
|
||||
'CHOPCONF': r'CHOPCONF:\s+\S+\s+(.*)',
|
||||
'PWMCONF': r'PWMCONF:\s+\S+\s+(.*)',
|
||||
'COOLCONF': r'COOLCONF:\s+(.*)',
|
||||
'TPWMTHRS': r'TPWMTHRS:\s+\S+\s+(.*)',
|
||||
'TCOOLTHRS': r'TCOOLTHRS:\s+\S+\s+(.*)',
|
||||
}
|
||||
|
||||
def __init__(self, filepath: Path, config_string: Optional[str] = None):
|
||||
self._filepath = filepath
|
||||
|
||||
self._motors: List[Motor] = []
|
||||
self._config = self._parse_config(config_string) if config_string else {}
|
||||
|
||||
self._parse_registers()
|
||||
|
||||
def _parse_config(self, config_string: str) -> Dict[str, Any]:
|
||||
config = {}
|
||||
entries = config_string.split('|')
|
||||
for entry in entries:
|
||||
if entry:
|
||||
key, value = entry.split(':')
|
||||
config[key.strip()] = self._convert_value(value.strip())
|
||||
return config
|
||||
|
||||
def _convert_value(self, value: str) -> Union[int, float, bool, str]:
|
||||
if value.isdigit():
|
||||
return int(value)
|
||||
try:
|
||||
return float(value)
|
||||
except ValueError:
|
||||
if value.lower() in ['true', 'false']:
|
||||
return value.lower() == 'true'
|
||||
return value
|
||||
|
||||
def _parse_registers(self) -> None:
|
||||
with open(self._filepath, 'r') as file:
|
||||
log_content = file.read()
|
||||
|
||||
sections = re.split(self._section_pattern, log_content)
|
||||
|
||||
# Detect only the latest dumps from the log (to ignore potential previous and outdated dumps)
|
||||
last_sections: Dict[str, int] = {}
|
||||
for i in range(1, len(sections), 2):
|
||||
stepper_name = 'stepper_' + sections[i].strip()
|
||||
last_sections[stepper_name] = i
|
||||
|
||||
for stepper_name, index in last_sections.items():
|
||||
content = sections[index + 1]
|
||||
motor = Motor(stepper_name)
|
||||
|
||||
# Apply general properties from config string
|
||||
for key, value in self._config.items():
|
||||
if stepper_name in key:
|
||||
prop_key = key.replace(stepper_name + '_', '')
|
||||
motor.set_property(prop_key, value)
|
||||
elif 'autotune' in key:
|
||||
motor.set_property(key, value)
|
||||
|
||||
# Parse TMC registers
|
||||
for key, pattern in self._register_patterns.items():
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
values = match.group(1).strip()
|
||||
motor.set_register(key, values)
|
||||
|
||||
self._motors.append(motor)
|
||||
|
||||
# Find and return the motor by its name
|
||||
def get_motor(self, motor_name: str) -> Optional[Motor]:
|
||||
for motor in self._motors:
|
||||
if motor._name == motor_name:
|
||||
return motor
|
||||
return None
|
||||
|
||||
# Get all the motor list at once
|
||||
def get_motors(self) -> List[Motor]:
|
||||
return self._motors
|
||||
|
||||
|
||||
# # Usage example:
|
||||
# config_string = "stepper_x_tmc:tmc2240|stepper_x_run_current:0.9|stepper_x_hold_current:0.9|stepper_y_tmc:tmc2240|stepper_y_run_current:0.9|stepper_y_hold_current:0.9|autotune_enabled:True|stepper_x_motor:ldo-35sth48-1684ah|stepper_x_voltage:|stepper_y_motor:ldo-35sth48-1684ah|stepper_y_voltage:|"
|
||||
# parser = MotorLogParser('/path/to/your/logfile.log', config_string)
|
||||
|
||||
# stepper_x = parser.get_motor('stepper_x')
|
||||
# stepper_y = parser.get_motor('stepper_y')
|
||||
|
||||
# print(stepper_x)
|
||||
# print(stepper_y)
|
||||
Reference in New Issue
Block a user