Files
klippain-shaketune-telegramm/shaketune/post_processing/graph_creator.py
2024-05-19 12:10:26 +02:00

280 lines
10 KiB
Python

#!/usr/bin/env python3
import abc
import re
import shutil
import tarfile
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
from matplotlib.figure import Figure
from ..helpers import filemanager as fm
from ..helpers.console_output import ConsoleOutput
from ..measurement.motorsconfigparser import MotorsConfigParser
from ..shaketune_config import ShakeTuneConfig
from .analyze_axesmap import axesmap_calibration
from .graph_belts import belts_calibration
from .graph_shaper import shaper_calibration
from .graph_vibrations import vibrations_profile
class GraphCreator(abc.ABC):
def __init__(self, config: ShakeTuneConfig):
self._config = config
self._graph_date = datetime.now().strftime('%Y%m%d_%H%M%S')
self._version = ShakeTuneConfig.get_git_version()
self._type = None
self._folder = None
def _setup_folder(self, graph_type: str) -> None:
self._type = graph_type
self._folder = self._config.get_results_folder(graph_type)
def _move_and_prepare_files(
self,
glob_pattern: str,
min_files_required: Optional[int] = None,
custom_name_func: Optional[Callable[[Path], str]] = None,
) -> list[Path]:
tmp_path = Path('/tmp')
globbed_files = list(tmp_path.glob(glob_pattern))
# If min_files_required is not set, use the number of globbed files as the minimum
min_files_required = min_files_required or len(globbed_files)
if not globbed_files:
raise FileNotFoundError(f'no CSV files found in the /tmp folder to create the {self._type} graphs!')
if len(globbed_files) < min_files_required:
raise FileNotFoundError(f'{min_files_required} CSV files are needed to create the {self._type} graphs!')
lognames = []
for filename in sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[:min_files_required]:
fm.wait_file_ready(filename)
custom_name = custom_name_func(filename) if custom_name_func else filename.name
new_file = self._folder / f'{self._type}_{self._graph_date}_{custom_name}.csv'
# shutil.move() is needed to move the file across filesystems (mainly for BTT CB1 Pi default OS image)
shutil.move(filename, new_file)
lognames.append(new_file)
return lognames
def _save_figure_and_cleanup(self, fig: Figure, lognames: list[Path], axis_label: Optional[str] = None) -> None:
axis_suffix = f'_{axis_label}' if axis_label else ''
png_filename = self._folder / f'{self._type}_{self._graph_date}{axis_suffix}.png'
fig.savefig(png_filename, dpi=self._config.dpi)
if self._config.keep_csv:
self._archive_files(lognames)
else:
self._remove_files(lognames)
def _archive_files(self, _: list[Path]) -> None:
return
def _remove_files(self, lognames: list[Path]) -> None:
for csv in lognames:
csv.unlink(missing_ok=True)
def get_type(self) -> str:
return self._type
@abc.abstractmethod
def create_graph(self) -> None:
pass
@abc.abstractmethod
def clean_old_files(self, keep_results: int) -> None:
pass
class BeltsGraphCreator(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._kinematics = None
self._accel_per_hz = None
self._setup_folder('belts')
def configure(self, kinematics: str = None, accel_per_hz: float = None) -> None:
self._kinematics = kinematics
self._accel_per_hz = accel_per_hz
def create_graph(self) -> None:
lognames = self._move_and_prepare_files(
glob_pattern='shaketune-belt_*.csv',
min_files_required=2,
custom_name_func=lambda f: f.stem.split('_')[1].upper(),
)
fig = belts_calibration(
lognames=[str(path) for path in lognames],
klipperdir=str(self._config.klipper_folder),
accel_per_hz=self._accel_per_hz,
st_version=self._version,
)
self._save_figure_and_cleanup(fig, lognames)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= keep_results:
return # No need to delete any files
# Delete the older files
for old_file in files[keep_results:]:
file_date = '_'.join(old_file.stem.split('_')[1:3])
for suffix in ['A', 'B']:
csv_file = self._folder / f'belts_{file_date}_{suffix}.csv'
csv_file.unlink(missing_ok=True)
old_file.unlink()
class ShaperGraphCreator(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._max_smoothing = None
self._scv = None
self._setup_folder('shaper')
def configure(self, scv: float, max_smoothing: float = None, accel_per_hz: float = None) -> None:
self._scv = scv
self._max_smoothing = max_smoothing
self._accel_per_hz = accel_per_hz
def create_graph(self) -> None:
if not self._scv:
raise ValueError('scv must be set to create the input shaper graph!')
lognames = self._move_and_prepare_files(
glob_pattern='shaketune-axis_*.csv',
min_files_required=1,
custom_name_func=lambda f: f.stem.split('_')[1].upper(),
)
fig = shaper_calibration(
lognames=[str(path) for path in lognames],
klipperdir=str(self._config.klipper_folder),
max_smoothing=self._max_smoothing,
scv=self._scv,
accel_per_hz=self._accel_per_hz,
st_version=self._version,
)
self._save_figure_and_cleanup(fig, lognames, lognames[0].stem.split('_')[-1])
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= 2 * keep_results:
return # No need to delete any files
# Delete the older files
for old_file in files[2 * keep_results :]:
csv_file = old_file.with_suffix('.csv')
csv_file.unlink(missing_ok=True)
old_file.unlink()
class VibrationsGraphCreator(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._kinematics = None
self._accel = None
self._motors = None
self._setup_folder('vibrations')
def configure(self, kinematics: str, accel: float, motor_config_parser: MotorsConfigParser) -> None:
self._kinematics = kinematics
self._accel = accel
self._motors = motor_config_parser.get_motors()
def _archive_files(self, lognames: list[Path]) -> None:
tar_path = self._folder / f'{self._type}_{self._graph_date}.tar.gz'
with tarfile.open(tar_path, 'w:gz') as tar:
for csv_file in lognames:
tar.add(csv_file, arcname=csv_file.name, recursive=False)
csv_file.unlink()
def create_graph(self) -> None:
if not self._accel or not self._kinematics:
raise ValueError('accel, chip_name and kinematics must be set to create the vibrations profile graph!')
lognames = self._move_and_prepare_files(
glob_pattern='shaketune-vib_*.csv',
min_files_required=None,
custom_name_func=lambda f: re.search(r'shaketune-vib_(.*?)_\d{8}_\d{6}', f.name).group(1),
)
fig = vibrations_profile(
lognames=[str(path) for path in lognames],
klipperdir=str(self._config.klipper_folder),
kinematics=self._kinematics,
accel=self._accel,
st_version=self._version,
motors=self._motors,
)
self._save_figure_and_cleanup(fig, lognames)
def clean_old_files(self, keep_results: int = 3) -> None:
# Get all PNG files in the directory as a list of Path objects
files = sorted(self._folder.glob('*.png'), key=lambda f: f.stat().st_mtime, reverse=True)
if len(files) <= keep_results:
return # No need to delete any files
# Delete the older files
for old_file in files[keep_results:]:
old_file.unlink()
tar_file = old_file.with_suffix('.tar.gz')
tar_file.unlink(missing_ok=True)
class AxesMapFinder(GraphCreator):
def __init__(self, config: ShakeTuneConfig):
super().__init__(config)
self._graph_date = datetime.now().strftime('%Y%m%d_%H%M%S')
self._type = 'axesmap'
self._folder = config.get_results_folder()
self._accel = None
def configure(self, accel: int) -> None:
self._accel = accel
def find_axesmap(self) -> None:
tmp_folder = Path('/tmp')
globbed_files = list(tmp_folder.glob('shaketune-axemap_*.csv'))
if not globbed_files:
raise FileNotFoundError('no CSV files found in the /tmp folder to find the axes map!')
# Find the CSV files with the latest timestamp and process it
logname = sorted(globbed_files, key=lambda f: f.stat().st_mtime, reverse=True)[0]
results = axesmap_calibration(
lognames=[str(logname)],
accel=self._accel,
)
ConsoleOutput.print(results)
result_filename = self._folder / f'{self._type}_{self._graph_date}.txt'
with result_filename.open('w') as f:
f.write(results)
# While the AxesMapFinder doesn't directly create a graph, we need to implement this
# method to allow using it seemlessly like all the other GraphCreator objects
def create_graph(self) -> None:
self.find_axesmap()
def clean_old_files(self, keep_results: int) -> None:
tmp_folder = Path('/tmp')
globbed_files = list(tmp_folder.glob('shaketune-axemap_*.csv'))
for csv_file in globbed_files:
csv_file.unlink()