Files
klippain-shaketune-telegramm/shaketune/post_processing/graph_creator.py

279 lines
10 KiB
Python

#!/usr/bin/env python3
import abc
import re
import shutil
import tarfile
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
from matplotlib.figure import Figure
from ..helpers.console_output import ConsoleOutput
from ..measurement.motorsconfigparser import MotorsConfigParser
from ..shaketune_config import ShakeTuneConfig
from .analyze_axesmap import axesmap_calibration
from .graph_belts import belts_calibration
from .graph_shaper import shaper_calibration
from .graph_vibrations import vibrations_profile
class GraphCreator(abc.ABC):
def __init__(self, config: ShakeTuneConfig):
self._config = config
self._graph_date = datetime.now().strftime('%Y%m%d_%H%M%S')
self._version = ShakeTuneConfig.get_git_version()
self._type = None
self._folder = None
def _setup_folder(self, graph_type: str) -> None:
self._type = graph_type
self._folder = self._config.get_results_folder(graph_type)
def _move_and_prepare_files(
self,
glob_pattern: str,
min_files_required: Optional[int] = None,
custom_name_func: Optional[Callable[[Path], str]] = None,
) -> list[Path]:
tmp_path = Path('/tmp')
globbed_files = list(tmp_path.glob(glob_pattern))
# If min_files_required is not set, use the number of globbed files as the minimum
min_files_required = min_files_required or len(globbed_files)
if not globbed_files:
raise FileNotFoundError(f'no CSV files found in the /tmp folder to create the {self._type} graphs!')
if len(globbed_files) < min_files_required:
raise FileNotFoundError(f'{min_files_required} CSV files are needed to create the {self._type} graphs!')
lognames = []
for filename in sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[:min_files_required]:
custom_name = custom_name_func(filename) if custom_name_func else filename.name
new_file = self._folder / f'{self._type}_{self._graph_date}_{custom_name}.csv'
# shutil.move() is needed to move the file across filesystems (mainly for BTT CB1 Pi default OS image)
shutil.move(filename, new_file)
lognames.append(new_file)
return lognames
def _save_figure_and_cleanup(self, fig: Figure, lognames: list[Path], axis_label: Optional[str] = None) -> None:
axis_suffix = f'_{axis_label}' if axis_label else ''
png_filename = self._folder / f'{self._type}_{self._graph_date}{axis_suffix}.png'
fig.savefig(png_filename, dpi=self._config.dpi)
if self._config.keep_csv:
self._archive_files(lognames)
else:
self._remove_files(lognames)
def _archive_files(self, _: list[Path]) -> None:
return
def _remove_files(self, lognames: list[Path]) -> None:
for csv in lognames:
csv.unlink(missing_ok=True)
def get_type(self) -> str:
return self._type
@abc.abstractmethod
def create_graph(self) -> None:
pass
@abc.abstractmethod
def clean_old_files(self, keep_results: int) -> None:
pass
class BeltsGraphCreator(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._kinematics = None
self._accel_per_hz = None
self._setup_folder('belts')
def configure(self, kinematics: str = None, accel_per_hz: float = None) -> None:
self._kinematics = kinematics
self._accel_per_hz = accel_per_hz
def create_graph(self) -> None:
lognames = self._move_and_prepare_files(
glob_pattern='shaketune-belt_*.csv',
min_files_required=2,
custom_name_func=lambda f: f.stem.split('_')[1].upper(),
)
fig = belts_calibration(
lognames=[str(path) for path in lognames],
kinematics=self._kinematics,
klipperdir=str(self._config.klipper_folder),
accel_per_hz=self._accel_per_hz,
st_version=self._version,
)
self._save_figure_and_cleanup(fig, lognames)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= keep_results:
return # No need to delete any files
# Delete the older files
for old_file in files[keep_results:]:
file_date = '_'.join(old_file.stem.split('_')[1:3])
for suffix in ['A', 'B']:
csv_file = self._folder / f'belts_{file_date}_{suffix}.csv'
csv_file.unlink(missing_ok=True)
old_file.unlink()
class ShaperGraphCreator(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._max_smoothing = None
self._scv = None
self._setup_folder('shaper')
def configure(self, scv: float, max_smoothing: float = None, accel_per_hz: float = None) -> None:
self._scv = scv
self._max_smoothing = max_smoothing
self._accel_per_hz = accel_per_hz
def create_graph(self) -> None:
if not self._scv:
raise ValueError('scv must be set to create the input shaper graph!')
lognames = self._move_and_prepare_files(
glob_pattern='shaketune-axis_*.csv',
min_files_required=1,
custom_name_func=lambda f: f.stem.split('_')[1].upper(),
)
fig = shaper_calibration(
lognames=[str(path) for path in lognames],
klipperdir=str(self._config.klipper_folder),
max_smoothing=self._max_smoothing,
scv=self._scv,
accel_per_hz=self._accel_per_hz,
st_version=self._version,
)
self._save_figure_and_cleanup(fig, lognames, lognames[0].stem.split('_')[-1])
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= 2 * keep_results:
return # No need to delete any files
# Delete the older files
for old_file in files[2 * keep_results :]:
csv_file = old_file.with_suffix('.csv')
csv_file.unlink(missing_ok=True)
old_file.unlink()
class VibrationsGraphCreator(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._kinematics = None
self._accel = None
self._motors = None
self._setup_folder('vibrations')
def configure(self, kinematics: str, accel: float, motor_config_parser: MotorsConfigParser) -> None:
self._kinematics = kinematics
self._accel = accel
self._motors = motor_config_parser.get_motors()
def _archive_files(self, lognames: list[Path]) -> None:
tar_path = self._folder / f'{self._type}_{self._graph_date}.tar.gz'
with tarfile.open(tar_path, 'w:gz') as tar:
for csv_file in lognames:
tar.add(csv_file, arcname=csv_file.name, recursive=False)
csv_file.unlink()
def create_graph(self) -> None:
if not self._accel or not self._kinematics:
raise ValueError('accel, chip_name and kinematics must be set to create the vibrations profile graph!')
lognames = self._move_and_prepare_files(
glob_pattern='shaketune-vib_*.csv',
min_files_required=None,
custom_name_func=lambda f: re.search(r'shaketune-vib_(.*?)_\d{8}_\d{6}', f.name).group(1),
)
fig = vibrations_profile(
lognames=[str(path) for path in lognames],
klipperdir=str(self._config.klipper_folder),
kinematics=self._kinematics,
accel=self._accel,
st_version=self._version,
motors=self._motors,
)
self._save_figure_and_cleanup(fig, lognames)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= keep_results:
return # No need to delete any files
# Delete the older files
for old_file in files[keep_results:]:
old_file.unlink()
tar_file = old_file.with_suffix('.tar.gz')
tar_file.unlink(missing_ok=True)
class AxesMapFinder(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._graph_date = datetime.now().strftime('%Y%m%d_%H%M%S')
self._type = 'axesmap'
self._folder = config.get_results_folder()
self._accel = None
def configure(self, accel: int) -> None:
self._accel = accel
def find_axesmap(self) -> None:
tmp_folder = Path('/tmp')
globbed_files = list(tmp_folder.glob('shaketune-axemap_*.csv'))
if not globbed_files:
raise FileNotFoundError('no CSV files found in the /tmp folder to find the axes map!')
# Find the CSV files with the latest timestamp and process it
logname = sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[0]
results = axesmap_calibration(
lognames=[str(logname)],
accel=self._accel,
)
ConsoleOutput.print(results)
result_filename = self._folder / f'{self._type}_{self._graph_date}.txt'
with result_filename.open('w') as f:
f.write(results)
# While the AxesMapFinder doesn't directly create a graph, we need to implement this
# method to allow using it seemlessly like all the other GraphCreator objects
def create_graph(self) -> None:
self.find_axesmap()
def clean_old_files(self, keep_results: int) -> None:
tmp_folder = Path('/tmp')
globbed_files = list(tmp_folder.glob('shaketune-axemap_*.csv'))
for csv_file in globbed_files:
csv_file.unlink()