From 94e110736a15d2162a23382197749487ab44c054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Boisselier?= Date: Fri, 19 Apr 2024 17:19:56 +0200 Subject: [PATCH] fully moved to Pathlib and global code improvements --- src/{ => graph_creators}/analyze_axesmap.py | 2 +- src/{ => graph_creators}/graph_belts.py | 4 +- src/{ => graph_creators}/graph_shaper.py | 4 +- src/{ => graph_creators}/graph_vibrations.py | 4 +- src/{ => helpers}/common_func.py | 0 src/helpers/filemanager.py | 43 +++ src/{ => helpers}/locale_utils.py | 0 src/is_workflow.py | 331 ++++++++++--------- 8 files changed, 223 insertions(+), 165 deletions(-) rename src/{ => graph_creators}/analyze_axesmap.py (99%) mode change 100755 => 100644 rename src/{ => graph_creators}/graph_belts.py (99%) mode change 100755 => 100644 rename src/{ => graph_creators}/graph_shaper.py (99%) mode change 100755 => 100644 rename src/{ => graph_creators}/graph_vibrations.py (99%) mode change 100755 => 100644 rename src/{ => helpers}/common_func.py (100%) mode change 100755 => 100644 create mode 100644 src/helpers/filemanager.py rename src/{ => helpers}/locale_utils.py (100%) mode change 100755 => 100644 diff --git a/src/analyze_axesmap.py b/src/graph_creators/analyze_axesmap.py old mode 100755 new mode 100644 similarity index 99% rename from src/analyze_axesmap.py rename to src/graph_creators/analyze_axesmap.py index faea83b..bcf5538 --- a/src/analyze_axesmap.py +++ b/src/graph_creators/analyze_axesmap.py @@ -10,7 +10,7 @@ import optparse import numpy as np from scipy.signal import butter, filtfilt -from locale_utils import print_with_c_locale +from helpers.locale_utils import print_with_c_locale NUM_POINTS = 500 diff --git a/src/graph_belts.py b/src/graph_creators/graph_belts.py old mode 100755 new mode 100644 similarity index 99% rename from src/graph_belts.py rename to src/graph_creators/graph_belts.py index 70a797d..a070c41 --- a/src/graph_belts.py +++ b/src/graph_creators/graph_belts.py @@ -20,14 +20,14 @@ from scipy.interpolate import griddata matplotlib.use('Agg') -from common_func import ( +from helpers.common_func import ( compute_curve_similarity_factor, compute_spectrogram, detect_peaks, parse_log, setup_klipper_import, ) -from locale_utils import print_with_c_locale, set_locale +from helpers.locale_utils import print_with_c_locale, set_locale ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' # For paired peaks names diff --git a/src/graph_shaper.py b/src/graph_creators/graph_shaper.py old mode 100755 new mode 100644 similarity index 99% rename from src/graph_shaper.py rename to src/graph_creators/graph_shaper.py index dfebbd8..5ae23a8 --- a/src/graph_shaper.py +++ b/src/graph_creators/graph_shaper.py @@ -20,14 +20,14 @@ import numpy as np matplotlib.use('Agg') -from common_func import ( +from helpers.common_func import ( compute_mechanical_parameters, compute_spectrogram, detect_peaks, parse_log, setup_klipper_import, ) -from locale_utils import print_with_c_locale, set_locale +from helpers.locale_utils import print_with_c_locale, set_locale PEAKS_DETECTION_THRESHOLD = 0.05 PEAKS_EFFECT_THRESHOLD = 0.12 diff --git a/src/graph_vibrations.py b/src/graph_creators/graph_vibrations.py old mode 100755 new mode 100644 similarity index 99% rename from src/graph_vibrations.py rename to src/graph_creators/graph_vibrations.py index d32acf8..3931d9f --- a/src/graph_vibrations.py +++ b/src/graph_creators/graph_vibrations.py @@ -21,14 +21,14 @@ import numpy as np matplotlib.use('Agg') -from common_func import ( +from helpers.common_func import ( compute_mechanical_parameters, detect_peaks, identify_low_energy_zones, parse_log, setup_klipper_import, ) -from locale_utils import print_with_c_locale, set_locale +from helpers.locale_utils import print_with_c_locale, set_locale PEAKS_DETECTION_THRESHOLD = 0.05 PEAKS_RELATIVE_HEIGHT_THRESHOLD = 0.04 diff --git a/src/common_func.py b/src/helpers/common_func.py old mode 100755 new mode 100644 similarity index 100% rename from src/common_func.py rename to src/helpers/common_func.py diff --git a/src/helpers/filemanager.py b/src/helpers/filemanager.py new file mode 100644 index 0000000..ba872a7 --- /dev/null +++ b/src/helpers/filemanager.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +# Common file management functions for the Shake&Tune package +# Written by Frix_x#0161 # + +import time +from pathlib import Path + +from is_workflow import Config + + +def wait_file_ready(filepath: Path) -> None: + file_busy = True + loop_count = 0 + proc_path = Path('/proc') + while file_busy: + if loop_count > 60: + # If Klipper is taking too long to release the file (60 * 1s = 1min), raise an error + raise TimeoutError(f'Klipper is taking too long to release {filepath}!') + + for proc in proc_path.iterdir(): + if proc.name.isdigit(): + fd_path = proc / 'fd' + if fd_path.exists(): + for fd in fd_path.iterdir(): + try: + # Using resolve to ensure symbolic links are followed + if fd.resolve(strict=False) == filepath: + pass # File is still being used by Klipper + except FileNotFoundError: # Klipper has already released the CSV file + file_busy = False + break + except PermissionError: # Unable to check for this particular process due to permissions + pass + + loop_count += 1 + time.sleep(1) + + +def ensure_folders_exist() -> None: + for subfolder in Config.RESULTS_SUBFOLDERS.values(): + folder = Config.RESULTS_BASE_FOLDER / subfolder + Path(folder).mkdir(parents=True, exist_ok=True) diff --git a/src/locale_utils.py b/src/helpers/locale_utils.py old mode 100755 new mode 100644 similarity index 100% rename from src/locale_utils.py rename to src/helpers/locale_utils.py diff --git a/src/is_workflow.py b/src/is_workflow.py index 6ea88c8..bd75457 100755 --- a/src/is_workflow.py +++ b/src/is_workflow.py @@ -11,35 +11,34 @@ import abc import argparse -import glob -import os -import shutil import tarfile -import time import traceback from datetime import datetime from pathlib import Path +from typing import Callable, Optional from git import GitCommandError, Repo +from matplotlib.figure import Figure -from analyze_axesmap import axesmap_calibration -from graph_belts import belts_calibration -from graph_shaper import shaper_calibration -from graph_vibrations import vibrations_profile -from locale_utils import print_with_c_locale +import helpers.filemanager as fm +from graph_creators.analyze_axesmap import axesmap_calibration +from graph_creators.graph_belts import belts_calibration +from graph_creators.graph_shaper import shaper_calibration +from graph_creators.graph_vibrations import vibrations_profile +from helpers.locale_utils import print_with_c_locale class Config: - KLIPPER_FOLDER = os.path.expanduser('~/klipper') - RESULTS_BASE_FOLDER = os.path.expanduser('~/printer_data/config/K-ShakeTune_results') + KLIPPER_FOLDER = Path.home() / 'klipper' + RESULTS_BASE_FOLDER = Path.home() / 'printer_data/config/K-ShakeTune_results' RESULTS_SUBFOLDERS = {'belts': 'belts', 'shaper': 'inputshaper', 'vibrations': 'vibrations'} @staticmethod - def get_results_folder(type): - return os.path.join(Config.RESULTS_BASE_FOLDER, Config.RESULTS_SUBFOLDERS[type]) + def get_results_folder(type: str) -> Path: + return Config.RESULTS_BASE_FOLDER / Config.RESULTS_SUBFOLDERS[type] @staticmethod - def get_git_version(): + def get_git_version() -> str: try: # Get the absolute path of the script, resolving any symlinks # Then get 1 times to parent dir to be at the git root folder @@ -56,7 +55,7 @@ class Config: return 'unknown' @staticmethod - def parse_arguments(): + def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser(description='Shake&Tune graphs generation script') parser.add_argument( '-t', @@ -125,39 +124,8 @@ class Config: return parser.parse_args() -class FileManager: - @staticmethod - def wait_file_ready(filepath): - file_busy = True - loop_count = 0 - while file_busy: - for proc in os.listdir('/proc'): - if proc.isdigit(): - for fd in glob.glob(f'/proc/{proc}/fd/*'): - try: - if os.path.samefile(fd, filepath): - pass - except FileNotFoundError: # Klipper has already released the CSV file - file_busy = False - except PermissionError: # Unable to check for this particular process due to permissions - pass - if loop_count > 60: - # If Klipper is taking too long to release the file (60 * 1s = 1min), raise an error - raise TimeoutError(f'Klipper is taking too long to release {filepath}!') - else: - loop_count += 1 - time.sleep(1) - return - - @staticmethod - def ensure_folders_exist(): - for subfolder in Config.RESULTS_SUBFOLDERS.values(): - folder = os.path.join(Config.RESULTS_BASE_FOLDER, subfolder) - os.makedirs(folder, exist_ok=True) - - class GraphCreator(abc.ABC): - def __init__(self, keep_csv, dpi): + def __init__(self, keep_csv: bool, dpi: int): self._keep_csv = keep_csv self._dpi = dpi @@ -167,12 +135,18 @@ class GraphCreator(abc.ABC): self._type = None self._folder = None - def _setup_folder(self, graph_type): + def _setup_folder(self, graph_type: str) -> None: self._type = graph_type self._folder = Config.get_results_folder(graph_type) - def _move_and_prepare_files(self, glob_pattern, min_files_required, custom_name_func=None): - globbed_files = glob.glob(glob_pattern) + def _move_and_prepare_files( + self, + glob_pattern: str, + min_files_required: Optional[int], + custom_name_func: Optional[Callable[[Path], str]], + ) -> list[Path]: + tmp_path = Path('/tmp') + globbed_files = list(tmp_path.glob(glob_pattern)) # If min_files_required is not set, use the number of globbed files as the minimum min_files_required = min_files_required or len(globbed_files) @@ -183,18 +157,18 @@ class GraphCreator(abc.ABC): raise FileNotFoundError(f'{min_files_required} CSV files are needed to create the {self._type} graphs!') lognames = [] - for filename in sorted(globbed_files, key=os.path.getmtime, reverse=True)[:min_files_required]: - FileManager.wait_file_ready(filename) - custom_name = custom_name_func(filename) if custom_name_func else os.path.basename(filename) - new_file = os.path.join(self._folder, f'{self._type}_{self._graph_date}_{custom_name}.csv') - shutil.move(filename, new_file) - FileManager.wait_file_ready(new_file) + for filename in sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[:min_files_required]: + fm.wait_file_ready(filename) + custom_name = custom_name_func(filename) if custom_name_func else filename.name + new_file = self._folder / f'{self._type}_{self._graph_date}_{custom_name}.csv' + filename.rename(new_file) + fm.wait_file_ready(new_file) lognames.append(new_file) return lognames - def _save_figure_and_cleanup(self, fig, lognames, axis_label=None): + def _save_figure_and_cleanup(self, fig: Figure, lognames: list[Path], axis_label: Optional[str]) -> None: axis_suffix = f'_{axis_label}' if axis_label else '' - png_filename = os.path.join(self._folder, f'{self._type}_{self._graph_date}{axis_suffix}.png') + png_filename = self._folder / f'{self._type}_{self._graph_date}{axis_suffix}.png' fig.savefig(png_filename, dpi=self._dpi) if self._keep_csv: @@ -202,133 +176,157 @@ class GraphCreator(abc.ABC): else: self._remove_files(lognames) - def _archive_files(self, _): + def _archive_files(self, _: list[Path]) -> None: return - def _remove_files(self, lognames): + def _remove_files(self, lognames: list[Path]) -> None: for csv in lognames: - if os.path.exists(csv): - os.remove(csv) + csv.unlink(missing_ok=True) @abc.abstractmethod - def create_graph(self): + def create_graph(self) -> None: pass @abc.abstractmethod - def clean_old_files(self, keep_results): + def clean_old_files(self, keep_results: int) -> None: pass class BeltsGraphCreator(GraphCreator): - def __init__(self, keep_csv=False, dpi=150): + def __init__(self, keep_csv: bool = False, dpi: int = 150): super().__init__(keep_csv, dpi) self._setup_folder('belts') - def create_graph(self): + def create_graph(self) -> None: lognames = self._move_and_prepare_files( - glob_pattern='/tmp/raw_data_axis*.csv', + glob_pattern='raw_data_axis*.csv', min_files_required=2, - custom_name_func=lambda f: os.path.basename(f).split('_')[3].split('.')[0].upper(), + custom_name_func=lambda f: f.stem.split('_')[3].upper(), + ) + fig = belts_calibration( + lognames=[str(path) for path in lognames], + klipperdir=str(Config.KLIPPER_FOLDER), + st_version=self._version, ) - fig = belts_calibration(lognames, Config.KLIPPER_FOLDER, st_version=self._version) self._save_figure_and_cleanup(fig, lognames) - def clean_old_files(self, keep_results=3): - folder = self._folder - files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.png')] - files.sort(key=os.path.getmtime, reverse=True) + def clean_old_files(self, keep_results: int = 3) -> None: + # Get all PNG files in the directory as a list of Path objects + files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True) if len(files) <= keep_results: - return - else: - for old_file in files[keep_results:]: - file_date = '_'.join(os.path.splitext(os.path.basename(old_file))[0].split('_')[1:3]) - for suffix in ['A', 'B']: - csv_file = os.path.join(folder, f'belt_{file_date}_{suffix}.csv') - if os.path.exists(csv_file): - os.remove(csv_file) - os.remove(old_file) + return # No need to delete any files + + # Delete the older files + for old_file in files[keep_results:]: + file_date = '_'.join(old_file.stem.split('_')[1:3]) + for suffix in ['A', 'B']: + csv_file = self._folder / f'belts_{file_date}_{suffix}.csv' + csv_file.unlink(missing_ok=True) + old_file.unlink() class ShaperGraphCreator(GraphCreator): - def __init__(self, max_smoothing=None, scv=5.0, keep_csv=False, dpi=150): + def __init__(self, keep_csv: bool = False, dpi: int = 150): super().__init__(keep_csv, dpi) - self._max_smoothing = max_smoothing - self._scv = scv + self._max_smoothing = None + self._scv = None self._setup_folder('shaper') - def create_graph(self): + def configure(self, scv: float, max_smoothing: float = None) -> None: + self._scv = scv + self._max_smoothing = max_smoothing + + def create_graph(self) -> None: + if not self._scv: + raise ValueError('scv must be set to create the input shaper graph!') + lognames = self._move_and_prepare_files( - glob_pattern='/tmp/raw_data*.csv', + glob_pattern='raw_data*.csv', min_files_required=1, - custom_name_func=lambda f: os.path.basename(f).split('_')[3].split('.')[0].upper(), + custom_name_func=lambda f: f.stem.split('_')[3].upper(), ) fig = shaper_calibration( - lognames, Config.KLIPPER_FOLDER, max_smoothing=self._max_smoothing, scv=self._scv, st_version=self._version + lognames=[str(path) for path in lognames], + klipperdir=str(Config.KLIPPER_FOLDER), + max_smoothing=self._max_smoothing, + scv=self._scv, + st_version=self._version, ) - self._save_figure_and_cleanup(fig, lognames, lognames[0].split('_')[-1].split('.')[0]) + self._save_figure_and_cleanup(fig, lognames, lognames[0].stem.split('_')[-1]) - def clean_old_files(self, keep_results=3): - folder = self._folder - files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.png')] - files.sort(key=os.path.getmtime, reverse=True) + def clean_old_files(self, keep_results: int = 3) -> None: + # Get all PNG files in the directory as a list of Path objects + files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True) if len(files) <= 2 * keep_results: - return - else: # delete the older files - for old_file in files[2 * keep_results :]: - csv_file = os.path.join(folder, os.path.splitext(os.path.basename(old_file))[0] + '.csv') - if os.path.exists(csv_file): - os.remove(csv_file) - os.remove(old_file) + return # No need to delete any files + + # Delete the older files + for old_file in files[2 * keep_results :]: + csv_file = old_file.with_suffix('.csv') + csv_file.unlink(missing_ok=True) + old_file.unlink() class VibrationsGraphCreator(GraphCreator): - def __init__(self, kinematics, accel, chip_name, keep_csv=False, dpi=150): + def __init__(self, keep_csv: bool = False, dpi: int = 150): super().__init__(keep_csv, dpi) + self._kinematics = None + self._accel = None + self._chip_name = None + + self._setup_folder('vibrations') + + def configure(self, kinematics: str, accel: float, chip_name: str) -> None: self._kinematics = kinematics self._accel = accel self._chip_name = chip_name - self._setup_folder('vibrations') - - def _archive_files(self, lognames): - with tarfile.open(os.path.join(self._folder, f'{self._type}_{self._graph_date}.tar.gz'), 'w:gz') as tar: + def _archive_files(self, lognames: list[Path]) -> None: + tar_path = self._folder / f'{self._type}_{self._graph_date}.tar.gz' + with tarfile.open(tar_path, 'w:gz') as tar: for csv_file in lognames: - tar.add(csv_file, arcname=os.path.basename(csv_file), recursive=False) + tar.add(csv_file, arcname=csv_file.name, recursive=False) + + def create_graph(self) -> None: + if not self._accel or not self._chip_name or not self._kinematics: + raise ValueError('accel, chip_name and kinematics must be set to create the vibrations profile graph!') - def create_graph(self): lognames = self._move_and_prepare_files( - glob_pattern=f'/tmp/{self._chip_name}-*.csv', + glob_pattern=f'{self._chip_name}-*.csv', min_files_required=None, - custom_name_func=lambda f: os.path.basename(f).replace(self._chip_name, self._type), + custom_name_func=lambda f: f.name.replace(self._chip_name, self._type), ) fig = vibrations_profile( - lognames, Config.KLIPPER_FOLDER, kinematics=self._kinematics, accel=self._accel, st_version=self._version + lognames=[str(path) for path in lognames], + klipperdir=str(Config.KLIPPER_FOLDER), + kinematics=self._kinematics, + accel=self._accel, + st_version=self._version, ) self._save_figure_and_cleanup(fig, lognames) - def clean_old_files(self, keep_results=3): - folder = self._folder - files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.png')] - files.sort(key=os.path.getmtime, reverse=True) + def clean_old_files(self, keep_results: int = 3) -> None: + # Get all PNG files in the directory as a list of Path objects + files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True) if len(files) <= keep_results: - return - else: # delete the older files - for old_file in files[keep_results:]: - os.remove(old_file) - tar_file = os.path.join(folder, os.path.splitext(os.path.basename(old_file))[0] + '.tar.gz') - if os.path.exists(tar_file): - os.remove(tar_file) + return # No need to delete any files + + # Delete the older files + for old_file in files[keep_results:]: + old_file.unlink() + tar_file = old_file.with_suffix('.tar.gz') + tar_file.unlink(missing_ok=True) class AxesMapFinder: - def __init__(self, accel, chip_name): + def __init__(self, accel: float, chip_name: str): self._accel = accel self._chip_name = chip_name @@ -337,56 +335,73 @@ class AxesMapFinder: self._type = 'axesmap' self._folder = Config.RESULTS_BASE_FOLDER - def find_axesmap(self): - globbed_files = glob.glob(f'/tmp/{self._chip_name}-*.csv') + def find_axesmap(self) -> None: + tmp_folder = Path('/tmp') + globbed_files = list(tmp_folder.glob(f'{self._chip_name}-*.csv')) + if not globbed_files: raise FileNotFoundError('no CSV files found in the /tmp folder to find the axes map!') # Find the CSV files with the latest timestamp and wait for it to be released by Klipper - logname = sorted(globbed_files, key=os.path.getmtime, reverse=True)[0] - FileManager.wait_file_ready(logname) + logname = sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[0] + fm.wait_file_ready(logname) - results = axesmap_calibration([logname], self._accel) - result_filename = os.path.join(self._folder, f'{self._type}_{self._graph_date}.txt') - with open(result_filename, 'w') as f: + results = axesmap_calibration( + lognames=[str(logname)], + accel=self._accel, + ) + + result_filename = self._folder / f'{self._type}_{self._graph_date}.txt' + with result_filename.open('w') as f: f.write(results) def main(): options = Config.parse_arguments() - FileManager.ensure_folders_exist() + fm.ensure_folders_exist() print_with_c_locale(f'Shake&Tune version: {Config.get_git_version()}') - graph_creator = None - if options.type == 'belts': - graph_creator = BeltsGraphCreator(options.keep_csv, options.dpi) - elif options.type == 'shaper': - graph_creator = ShaperGraphCreator(options.max_smoothing, options.scv, options.keep_csv, options.dpi) - elif options.type == 'vibrations': - graph_creator = VibrationsGraphCreator( - options.kinematics, options.accel_used, options.chip_name, options.keep_csv, options.dpi - ) - elif options.type == 'axesmap': - graph_creator = AxesMapFinder(options.accel_used, options.chip_name) + graph_creators = { + 'belts': (BeltsGraphCreator, None), + 'shaper': (ShaperGraphCreator, lambda gc: gc.configure(options.scv, options.max_smoothing)), + 'vibrations': ( + VibrationsGraphCreator, + lambda gc: gc.configure(options.kinematics, options.accel_used, options.chip_name), + ), + 'axesmap': (AxesMapFinder, None), + } - if graph_creator: - try: - graph_creator.create_graph() - except FileNotFoundError as e: - print_with_c_locale(f'FileNotFound error: {e}') - return - except TimeoutError as e: - print_with_c_locale(f'Timeout error: {e}') - return - except Exception as e: - print_with_c_locale(f'Error while generating the graphs: {e}') - traceback.print_exc() - return + creator_info = graph_creators.get(options.type) + if not creator_info: + print_with_c_locale('Error: invalid graph type specified!') + return - print_with_c_locale(f'{options.type} graphs created successfully!') - graph_creator.clean_old_files(options.keep_results) - print_with_c_locale(f'Cleaned output folder to keep only the last {options.keep_results} results!') + # Instantiate the graph creator + graph_creator_class, configure_func = creator_info + graph_creator = graph_creator_class(options.keep_csv, options.dpi) + + # Configure it if needed + if configure_func: + configure_func(graph_creator) + + # And then run it + try: + graph_creator.create_graph() + except FileNotFoundError as e: + print_with_c_locale(f'FileNotFound error: {e}') + return + except TimeoutError as e: + print_with_c_locale(f'Timeout error: {e}') + return + except Exception as e: + print_with_c_locale(f'Error while generating the graphs: {e}') + traceback.print_exc() + return + + print_with_c_locale(f'{options.type} graphs created successfully!') + graph_creator.clean_old_files(options.keep_results) + print_with_c_locale(f'Cleaned output folder to keep only the last {options.keep_results} results!') if __name__ == '__main__':