Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

from pathlib import Path
from shutil import disk_usage
from socket import gethostname
from typing import List

from diagnostic_msgs.msg import DiagnosticStatus, KeyValue
Expand All @@ -65,39 +64,39 @@

class HDMonitor(Node):
"""
Diagnostic node checking the remaining space on the specified hard drive.
Diagnostic node checking the remaining space on the specified hard drive(s).

Three ROS parameters:
- path: Path on the filesystem to check (string, default: home directory)
- path: Path(s) on the filesystem to check (string or array of strings, default: home directory)
- free_percent_low: Percentage at which to consider the space left as low
- free_percent_crit: Percentage at which to consider the space left as critical
"""

def __init__(self):
hostname = gethostname()
# Every invalid symbol is replaced by underscore.
# isalnum() alone also allows invalid symbols depending on the locale
cleaned_hostname = ''.join(
c if (c.isascii() and c.isalnum()) else '_' for c in hostname)
super().__init__(f'hd_monitor_{cleaned_hostname}')
super().__init__(f'hd_monitor')

self._path = '~'
self._paths = ['~']
self._free_percent_low = FREE_PERCENT_LOW
self._free_percent_crit = FREE_PERCENT_CRIT

self.add_on_set_parameters_callback(self.callback_config)
self.declare_parameter('path', self._path, ParameterDescriptor(
description='Path in which to check remaining space.'))
self.declare_parameter('path', self._paths, ParameterDescriptor(
description='Path(s) in which to check remaining space.'))
self.declare_parameter(
'free_percent_low', self._free_percent_low, ParameterDescriptor(
description='Warning threshold.', type=int()))
description='Warning threshold.'))
self.declare_parameter(
'free_percent_crit', self._free_percent_crit, ParameterDescriptor(
description='Error threshold.', type=int()))
description='Error threshold.'))

self._updater = Updater(self)
self._updater.setHardwareID(hostname)
self._updater.add(f'{hostname} HD Usage', self.check_disk_usage)
self._updater.setHardwareID("main_pc")

# Add diagnostic check for each path
for i, path in enumerate(self._paths):
self._updater.add(f'HD Usage {i}', lambda diag, idx=i: self.check_disk_usage(diag, idx))

def callback_config(self, params: List[rclpy.Parameter]):
"""
Expand All @@ -107,25 +106,33 @@ def callback_config(self, params: List[rclpy.Parameter]):
"""
for param in params:
if param.name == 'path':
self._path = str(
Path(param.value).expanduser().resolve(strict=True)
)
# Handle both single path (string) and multiple paths (array)
if isinstance(param.value, str):
self._paths = [str(Path(param.value).expanduser().resolve(strict=True))]
else:
self._paths = [str(Path(p).expanduser().resolve(strict=True)) for p in param.value]
elif param.name == 'free_percent_low':
self._free_percent_low = param.value
elif param.name == 'free_percent_crit':
self._free_percent_crit = param.value

return SetParametersResult(successful=True)

def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus:
def check_disk_usage(self, diag: DiagnosticStatus, path_index: int = 0) -> DiagnosticStatus:
"""
Compute the disk usage and derive a status from it.

Task periodically ran by the diagnostic updater.
"""
diag.level = DiagnosticStatus.OK

total, _, free = disk_usage(self._path)
if path_index >= len(self._paths):
diag.level = DiagnosticStatus.ERROR
diag.message = "Invalid path index"
return diag

path = self._paths[path_index]
total, _, free = disk_usage(path)
percent = free / total * 100.0

if percent > self._free_percent_low:
Expand All @@ -135,12 +142,12 @@ def check_disk_usage(self, diag: DiagnosticStatus) -> DiagnosticStatus:
else:
diag.level = DiagnosticStatus.ERROR

total_go = total // (1024 * 1024)
total_gb = total // (1024 ** 3)
diag.values.extend(
[
KeyValue(key='Name', value=self._path),
KeyValue(key='Name', value=path),
KeyValue(key='Status', value=DICT_STATUS[diag.level]),
KeyValue(key='Total (Go)', value=str(total_go)),
KeyValue(key='Total (GiB)', value=str(total_gb)),
KeyValue(key='Available (%)', value=str(round(percent, 1))),
]
)
Expand Down