fully moved to Pathlib and global code improvements

This commit is contained in:
Félix Boisselier
2024-04-19 17:19:56 +02:00
parent 6184233b03
commit 94e110736a
8 changed files with 223 additions and 165 deletions

View File

@@ -10,7 +10,7 @@ import optparse
import numpy as np
from scipy.signal import butter, filtfilt
from locale_utils import print_with_c_locale
from helpers.locale_utils import print_with_c_locale
NUM_POINTS = 500

View File

@@ -20,14 +20,14 @@ from scipy.interpolate import griddata
matplotlib.use('Agg')
from common_func import (
from helpers.common_func import (
compute_curve_similarity_factor,
compute_spectrogram,
detect_peaks,
parse_log,
setup_klipper_import,
)
from locale_utils import print_with_c_locale, set_locale
from helpers.locale_utils import print_with_c_locale, set_locale
ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' # For paired peaks names

View File

@@ -20,14 +20,14 @@ import numpy as np
matplotlib.use('Agg')
from common_func import (
from helpers.common_func import (
compute_mechanical_parameters,
compute_spectrogram,
detect_peaks,
parse_log,
setup_klipper_import,
)
from locale_utils import print_with_c_locale, set_locale
from helpers.locale_utils import print_with_c_locale, set_locale
PEAKS_DETECTION_THRESHOLD = 0.05
PEAKS_EFFECT_THRESHOLD = 0.12

View File

@@ -21,14 +21,14 @@ import numpy as np
matplotlib.use('Agg')
from common_func import (
from helpers.common_func import (
compute_mechanical_parameters,
detect_peaks,
identify_low_energy_zones,
parse_log,
setup_klipper_import,
)
from locale_utils import print_with_c_locale, set_locale
from helpers.locale_utils import print_with_c_locale, set_locale
PEAKS_DETECTION_THRESHOLD = 0.05
PEAKS_RELATIVE_HEIGHT_THRESHOLD = 0.04

0
src/common_func.py → src/helpers/common_func.py Executable file → Normal file
View File

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
# Common file management functions for the Shake&Tune package
# Written by Frix_x#0161 #
import time
from pathlib import Path
from is_workflow import Config
def wait_file_ready(filepath: Path) -> None:
file_busy = True
loop_count = 0
proc_path = Path('/proc')
while file_busy:
if loop_count > 60:
# If Klipper is taking too long to release the file (60 * 1s = 1min), raise an error
raise TimeoutError(f'Klipper is taking too long to release {filepath}!')
for proc in proc_path.iterdir():
if proc.name.isdigit():
fd_path = proc / 'fd'
if fd_path.exists():
for fd in fd_path.iterdir():
try:
# Using resolve to ensure symbolic links are followed
if fd.resolve(strict=False) == filepath:
pass # File is still being used by Klipper
except FileNotFoundError: # Klipper has already released the CSV file
file_busy = False
break
except PermissionError: # Unable to check for this particular process due to permissions
pass
loop_count += 1
time.sleep(1)
def ensure_folders_exist() -> None:
for subfolder in Config.RESULTS_SUBFOLDERS.values():
folder = Config.RESULTS_BASE_FOLDER / subfolder
Path(folder).mkdir(parents=True, exist_ok=True)

0
src/locale_utils.py → src/helpers/locale_utils.py Executable file → Normal file
View File

View File

@@ -11,35 +11,34 @@
import abc
import argparse
import glob
import os
import shutil
import tarfile
import time
import traceback
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
from git import GitCommandError, Repo
from matplotlib.figure import Figure
from analyze_axesmap import axesmap_calibration
from graph_belts import belts_calibration
from graph_shaper import shaper_calibration
from graph_vibrations import vibrations_profile
from locale_utils import print_with_c_locale
import helpers.filemanager as fm
from graph_creators.analyze_axesmap import axesmap_calibration
from graph_creators.graph_belts import belts_calibration
from graph_creators.graph_shaper import shaper_calibration
from graph_creators.graph_vibrations import vibrations_profile
from helpers.locale_utils import print_with_c_locale
class Config:
KLIPPER_FOLDER = os.path.expanduser('~/klipper')
RESULTS_BASE_FOLDER = os.path.expanduser('~/printer_data/config/K-ShakeTune_results')
KLIPPER_FOLDER = Path.home() / 'klipper'
RESULTS_BASE_FOLDER = Path.home() / 'printer_data/config/K-ShakeTune_results'
RESULTS_SUBFOLDERS = {'belts': 'belts', 'shaper': 'inputshaper', 'vibrations': 'vibrations'}
@staticmethod
def get_results_folder(type):
return os.path.join(Config.RESULTS_BASE_FOLDER, Config.RESULTS_SUBFOLDERS[type])
def get_results_folder(type: str) -> Path:
return Config.RESULTS_BASE_FOLDER / Config.RESULTS_SUBFOLDERS[type]
@staticmethod
def get_git_version():
def get_git_version() -> str:
try:
# Get the absolute path of the script, resolving any symlinks
# Then get 1 times to parent dir to be at the git root folder
@@ -56,7 +55,7 @@ class Config:
return 'unknown'
@staticmethod
def parse_arguments():
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(description='Shake&Tune graphs generation script')
parser.add_argument(
'-t',
@@ -125,39 +124,8 @@ class Config:
return parser.parse_args()
class FileManager:
@staticmethod
def wait_file_ready(filepath):
file_busy = True
loop_count = 0
while file_busy:
for proc in os.listdir('/proc'):
if proc.isdigit():
for fd in glob.glob(f'/proc/{proc}/fd/*'):
try:
if os.path.samefile(fd, filepath):
pass
except FileNotFoundError: # Klipper has already released the CSV file
file_busy = False
except PermissionError: # Unable to check for this particular process due to permissions
pass
if loop_count > 60:
# If Klipper is taking too long to release the file (60 * 1s = 1min), raise an error
raise TimeoutError(f'Klipper is taking too long to release {filepath}!')
else:
loop_count += 1
time.sleep(1)
return
@staticmethod
def ensure_folders_exist():
for subfolder in Config.RESULTS_SUBFOLDERS.values():
folder = os.path.join(Config.RESULTS_BASE_FOLDER, subfolder)
os.makedirs(folder, exist_ok=True)
class GraphCreator(abc.ABC):
def __init__(self, keep_csv, dpi):
def __init__(self, keep_csv: bool, dpi: int):
self._keep_csv = keep_csv
self._dpi = dpi
@@ -167,12 +135,18 @@ class GraphCreator(abc.ABC):
self._type = None
self._folder = None
def _setup_folder(self, graph_type):
def _setup_folder(self, graph_type: str) -> None:
self._type = graph_type
self._folder = Config.get_results_folder(graph_type)
def _move_and_prepare_files(self, glob_pattern, min_files_required, custom_name_func=None):
globbed_files = glob.glob(glob_pattern)
def _move_and_prepare_files(
self,
glob_pattern: str,
min_files_required: Optional[int],
custom_name_func: Optional[Callable[[Path], str]],
) -> list[Path]:
tmp_path = Path('/tmp')
globbed_files = list(tmp_path.glob(glob_pattern))
# If min_files_required is not set, use the number of globbed files as the minimum
min_files_required = min_files_required or len(globbed_files)
@@ -183,18 +157,18 @@ class GraphCreator(abc.ABC):
raise FileNotFoundError(f'{min_files_required} CSV files are needed to create the {self._type} graphs!')
lognames = []
for filename in sorted(globbed_files, key=os.path.getmtime, reverse=True)[:min_files_required]:
FileManager.wait_file_ready(filename)
custom_name = custom_name_func(filename) if custom_name_func else os.path.basename(filename)
new_file = os.path.join(self._folder, f'{self._type}_{self._graph_date}_{custom_name}.csv')
shutil.move(filename, new_file)
FileManager.wait_file_ready(new_file)
for filename in sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[:min_files_required]:
fm.wait_file_ready(filename)
custom_name = custom_name_func(filename) if custom_name_func else filename.name
new_file = self._folder / f'{self._type}_{self._graph_date}_{custom_name}.csv'
filename.rename(new_file)
fm.wait_file_ready(new_file)
lognames.append(new_file)
return lognames
def _save_figure_and_cleanup(self, fig, lognames, axis_label=None):
def _save_figure_and_cleanup(self, fig: Figure, lognames: list[Path], axis_label: Optional[str]) -> None:
axis_suffix = f'_{axis_label}' if axis_label else ''
png_filename = os.path.join(self._folder, f'{self._type}_{self._graph_date}{axis_suffix}.png')
png_filename = self._folder / f'{self._type}_{self._graph_date}{axis_suffix}.png'
fig.savefig(png_filename, dpi=self._dpi)
if self._keep_csv:
@@ -202,133 +176,157 @@ class GraphCreator(abc.ABC):
else:
self._remove_files(lognames)
def _archive_files(self, _):
def _archive_files(self, _: list[Path]) -> None:
return
def _remove_files(self, lognames):
def _remove_files(self, lognames: list[Path]) -> None:
for csv in lognames:
if os.path.exists(csv):
os.remove(csv)
csv.unlink(missing_ok=True)
@abc.abstractmethod
def create_graph(self):
def create_graph(self) -> None:
pass
@abc.abstractmethod
def clean_old_files(self, keep_results):
def clean_old_files(self, keep_results: int) -> None:
pass
class BeltsGraphCreator(GraphCreator):
def __init__(self, keep_csv=False, dpi=150):
def __init__(self, keep_csv: bool = False, dpi: int = 150):
super().__init__(keep_csv, dpi)
self._setup_folder('belts')
def create_graph(self):
def create_graph(self) -> None:
lognames = self._move_and_prepare_files(
glob_pattern='/tmp/raw_data_axis*.csv',
glob_pattern='raw_data_axis*.csv',
min_files_required=2,
custom_name_func=lambda f: os.path.basename(f).split('_')[3].split('.')[0].upper(),
custom_name_func=lambda f: f.stem.split('_')[3].upper(),
)
fig = belts_calibration(
lognames=[str(path) for path in lognames],
klipperdir=str(Config.KLIPPER_FOLDER),
st_version=self._version,
)
fig = belts_calibration(lognames, Config.KLIPPER_FOLDER, st_version=self._version)
self._save_figure_and_cleanup(fig, lognames)
def clean_old_files(self, keep_results=3):
folder = self._folder
files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.png')]
files.sort(key=os.path.getmtime, reverse=True)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= keep_results:
return
else:
return # No need to delete any files
# Delete the older files
for old_file in files[keep_results:]:
file_date = '_'.join(os.path.splitext(os.path.basename(old_file))[0].split('_')[1:3])
file_date = '_'.join(old_file.stem.split('_')[1:3])
for suffix in ['A', 'B']:
csv_file = os.path.join(folder, f'belt_{file_date}_{suffix}.csv')
if os.path.exists(csv_file):
os.remove(csv_file)
os.remove(old_file)
csv_file = self._folder / f'belts_{file_date}_{suffix}.csv'
csv_file.unlink(missing_ok=True)
old_file.unlink()
class ShaperGraphCreator(GraphCreator):
def __init__(self, max_smoothing=None, scv=5.0, keep_csv=False, dpi=150):
def __init__(self, keep_csv: bool = False, dpi: int = 150):
super().__init__(keep_csv, dpi)
self._max_smoothing = max_smoothing
self._scv = scv
self._max_smoothing = None
self._scv = None
self._setup_folder('shaper')
def create_graph(self):
def configure(self, scv: float, max_smoothing: float = None) -> None:
self._scv = scv
self._max_smoothing = max_smoothing
def create_graph(self) -> None:
if not self._scv:
raise ValueError('scv must be set to create the input shaper graph!')
lognames = self._move_and_prepare_files(
glob_pattern='/tmp/raw_data*.csv',
glob_pattern='raw_data*.csv',
min_files_required=1,
custom_name_func=lambda f: os.path.basename(f).split('_')[3].split('.')[0].upper(),
custom_name_func=lambda f: f.stem.split('_')[3].upper(),
)
fig = shaper_calibration(
lognames, Config.KLIPPER_FOLDER, max_smoothing=self._max_smoothing, scv=self._scv, st_version=self._version
lognames=[str(path) for path in lognames],
klipperdir=str(Config.KLIPPER_FOLDER),
max_smoothing=self._max_smoothing,
scv=self._scv,
st_version=self._version,
)
self._save_figure_and_cleanup(fig, lognames, lognames[0].split('_')[-1].split('.')[0])
self._save_figure_and_cleanup(fig, lognames, lognames[0].stem.split('_')[-1])
def clean_old_files(self, keep_results=3):
folder = self._folder
files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.png')]
files.sort(key=os.path.getmtime, reverse=True)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= 2 * keep_results:
return
else: # delete the older files
return # No need to delete any files
# Delete the older files
for old_file in files[2 * keep_results :]:
csv_file = os.path.join(folder, os.path.splitext(os.path.basename(old_file))[0] + '.csv')
if os.path.exists(csv_file):
os.remove(csv_file)
os.remove(old_file)
csv_file = old_file.with_suffix('.csv')
csv_file.unlink(missing_ok=True)
old_file.unlink()
class VibrationsGraphCreator(GraphCreator):
def __init__(self, kinematics, accel, chip_name, keep_csv=False, dpi=150):
def __init__(self, keep_csv: bool = False, dpi: int = 150):
super().__init__(keep_csv, dpi)
self._kinematics = None
self._accel = None
self._chip_name = None
self._setup_folder('vibrations')
def configure(self, kinematics: str, accel: float, chip_name: str) -> None:
self._kinematics = kinematics
self._accel = accel
self._chip_name = chip_name
self._setup_folder('vibrations')
def _archive_files(self, lognames):
with tarfile.open(os.path.join(self._folder, f'{self._type}_{self._graph_date}.tar.gz'), 'w:gz') as tar:
def _archive_files(self, lognames: list[Path]) -> None:
tar_path = self._folder / f'{self._type}_{self._graph_date}.tar.gz'
with tarfile.open(tar_path, 'w:gz') as tar:
for csv_file in lognames:
tar.add(csv_file, arcname=os.path.basename(csv_file), recursive=False)
tar.add(csv_file, arcname=csv_file.name, recursive=False)
def create_graph(self) -> None:
if not self._accel or not self._chip_name or not self._kinematics:
raise ValueError('accel, chip_name and kinematics must be set to create the vibrations profile graph!')
def create_graph(self):
lognames = self._move_and_prepare_files(
glob_pattern=f'/tmp/{self._chip_name}-*.csv',
glob_pattern=f'{self._chip_name}-*.csv',
min_files_required=None,
custom_name_func=lambda f: os.path.basename(f).replace(self._chip_name, self._type),
custom_name_func=lambda f: f.name.replace(self._chip_name, self._type),
)
fig = vibrations_profile(
lognames, Config.KLIPPER_FOLDER, kinematics=self._kinematics, accel=self._accel, st_version=self._version
lognames=[str(path) for path in lognames],
klipperdir=str(Config.KLIPPER_FOLDER),
kinematics=self._kinematics,
accel=self._accel,
st_version=self._version,
)
self._save_figure_and_cleanup(fig, lognames)
def clean_old_files(self, keep_results=3):
folder = self._folder
files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.png')]
files.sort(key=os.path.getmtime, reverse=True)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= keep_results:
return
else: # delete the older files
return # No need to delete any files
# Delete the older files
for old_file in files[keep_results:]:
os.remove(old_file)
tar_file = os.path.join(folder, os.path.splitext(os.path.basename(old_file))[0] + '.tar.gz')
if os.path.exists(tar_file):
os.remove(tar_file)
old_file.unlink()
tar_file = old_file.with_suffix('.tar.gz')
tar_file.unlink(missing_ok=True)
class AxesMapFinder:
def __init__(self, accel, chip_name):
def __init__(self, accel: float, chip_name: str):
self._accel = accel
self._chip_name = chip_name
@@ -337,40 +335,57 @@ class AxesMapFinder:
self._type = 'axesmap'
self._folder = Config.RESULTS_BASE_FOLDER
def find_axesmap(self):
globbed_files = glob.glob(f'/tmp/{self._chip_name}-*.csv')
def find_axesmap(self) -> None:
tmp_folder = Path('/tmp')
globbed_files = list(tmp_folder.glob(f'{self._chip_name}-*.csv'))
if not globbed_files:
raise FileNotFoundError('no CSV files found in the /tmp folder to find the axes map!')
# Find the CSV files with the latest timestamp and wait for it to be released by Klipper
logname = sorted(globbed_files, key=os.path.getmtime, reverse=True)[0]
FileManager.wait_file_ready(logname)
logname = sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[0]
fm.wait_file_ready(logname)
results = axesmap_calibration([logname], self._accel)
result_filename = os.path.join(self._folder, f'{self._type}_{self._graph_date}.txt')
with open(result_filename, 'w') as f:
results = axesmap_calibration(
lognames=[str(logname)],
accel=self._accel,
)
result_filename = self._folder / f'{self._type}_{self._graph_date}.txt'
with result_filename.open('w') as f:
f.write(results)
def main():
options = Config.parse_arguments()
FileManager.ensure_folders_exist()
fm.ensure_folders_exist()
print_with_c_locale(f'Shake&Tune version: {Config.get_git_version()}')
graph_creator = None
if options.type == 'belts':
graph_creator = BeltsGraphCreator(options.keep_csv, options.dpi)
elif options.type == 'shaper':
graph_creator = ShaperGraphCreator(options.max_smoothing, options.scv, options.keep_csv, options.dpi)
elif options.type == 'vibrations':
graph_creator = VibrationsGraphCreator(
options.kinematics, options.accel_used, options.chip_name, options.keep_csv, options.dpi
)
elif options.type == 'axesmap':
graph_creator = AxesMapFinder(options.accel_used, options.chip_name)
graph_creators = {
'belts': (BeltsGraphCreator, None),
'shaper': (ShaperGraphCreator, lambda gc: gc.configure(options.scv, options.max_smoothing)),
'vibrations': (
VibrationsGraphCreator,
lambda gc: gc.configure(options.kinematics, options.accel_used, options.chip_name),
),
'axesmap': (AxesMapFinder, None),
}
if graph_creator:
creator_info = graph_creators.get(options.type)
if not creator_info:
print_with_c_locale('Error: invalid graph type specified!')
return
# Instantiate the graph creator
graph_creator_class, configure_func = creator_info
graph_creator = graph_creator_class(options.keep_csv, options.dpi)
# Configure it if needed
if configure_func:
configure_func(graph_creator)
# And then run it
try:
graph_creator.create_graph()
except FileNotFoundError as e: