You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

129 lines
4.9 KiB

from collections import deque
import time
class Telemetry:
"""Handles timing and performance monitoring for different code sections."""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self._current_timers = {}
self._last_results = {}
self._history = {} # Stores deques for history
def start_timer(self, name: str):
"""Starts a timer for a given operation."""
self._current_timers[name] = time.perf_counter()
def stop_timer(self, name: str) -> float:
"""Stops a timer and records the duration."""
if name not in self._current_timers:
print(f"Warning: Telemetry Timer '{name}' stopped without being started.")
return 0.0
start_time = self._current_timers.pop(name)
duration = time.perf_counter() - start_time
self.record_value(name, duration)
return duration
class Timer:
"""Context manager for timing operations."""
def __init__(self, telemetry, name: str):
self.telemetry = telemetry
self.name = name
def __enter__(self):
self.telemetry.start_timer(self.name)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.telemetry.stop_timer(self.name)
return False # Don't suppress exceptions
def timer(self, name: str):
"""Returns a context manager for timing an operation.
Example usage:
with telemetry.timer("operation_name"):
# Code to time goes here
"""
return self.Timer(self, name)
def record_value(self, name: str, value: float):
"""Records a pre-calculated value (e.g., duration, count)."""
self._last_results[name] = value
if name not in self._history:
self._history[name] = deque(maxlen=self.window_size)
self._history[name].append(value)
def get_last_timing(self) -> dict[str, float]:
"""Returns the timing results from the most recent cycle and clears internal state."""
results = self._last_results.copy()
# Don't clear _last_results here, let the logger decide when it's done
return results
def clear_last_timing(self):
"""Clears the timing results for the next iteration."""
self._last_results.clear()
def get_average(self, name: str) -> float | None:
"""Calculates the moving average for a recorded metric."""
if name not in self._history or not self._history[name]:
return None
return sum(self._history[name]) / len(self._history[name])
def get_history(self, name: str) -> deque[float] | None:
"""Returns the historical data for a metric."""
return self._history.get(name)
def log_timing_info(
self,
context: str = "",
threshold: float = 0.001,
log_averages: bool = True,
):
"""Logs timing information based on thresholds and averages."""
current_iteration_data = self.get_last_timing() # Get the latest data
significant_timings = {k: v for k, v in current_iteration_data.items() if v > threshold}
should_log = bool(significant_timings) # Log if any timing exceeds threshold
# Check averages condition
avg_data = {}
if log_averages:
for name in self._history:
avg = self.get_average(name)
if avg is not None:
avg_data[f"{name}_avg"] = avg
if avg_data: # Log if average data exists
should_log = True
# If nothing to log, return early
if not should_log:
self.clear_last_timing() # Clear data since it wasn't logged
return
log_lines = [f"\n{context} Timing breakdown:" if context else "\nTiming breakdown:"]
# Log current iteration significant timings
if significant_timings:
log_lines.append(f" Current Iteration (> {threshold*1000:.1f}ms):")
for name, duration in sorted(significant_timings.items()):
log_lines.append(f" {name}: {duration * 1000:.2f}ms")
elif current_iteration_data: # Log total time even if below threshold
total_key = next((k for k in current_iteration_data if "total" in k), None)
if total_key:
log_lines.append(
f" Current Iteration Total: {current_iteration_data[total_key] * 1000:.2f}ms"
)
# Log averages if requested and available
if log_averages and avg_data:
log_lines.append(f" Moving Averages (last {self.window_size} iters):")
for name, avg in sorted(avg_data.items()):
log_lines.append(f" {name}: {avg * 1000:.2f}ms")
# Print the collected log lines
print("\n".join(log_lines))
self.clear_last_timing() # Clear data now that it has been logged/processed