Source code for xplogger.parser.metric

"""Implementation of Parser to parse metrics from logs."""

from __future__ import annotations

from typing import Callable, Optional

import pandas as pd

from xplogger.parser import log as log_parser
from xplogger.types import LogType, MetricType, ParseLineFunctionType


[docs]def parse_json_and_match_value(line: str) -> Optional[LogType]: """Parse a line as JSON log and check if it a valid metric log.""" return log_parser.parse_json_and_match_value(line=line, value="metric")
[docs]def group_metrics(metrics: list[MetricType]) -> dict[str, list[MetricType]]: """Group a list of metrics. Group a list of metrics into a dictionary of (key, list of grouped metrics) Args: metrics (list[MetricType]): list of metrics to group Returns: dict[str, list[MetricType]]: Dictionary of (key, list of grouped metrics) """ return {"all": metrics}
[docs]def aggregate_metrics(metrics: list[MetricType]) -> list[MetricType]: """Aggregate a list of metrics. Args: metrics (list[MetricType]): list of metrics to aggregate Returns: list[MetricType]: list of aggregated metrics """ return metrics
[docs]class Parser(log_parser.Parser): """Class to parse the metrics from the logs.""" def __init__(self, parse_line: ParseLineFunctionType = parse_json_and_match_value): """Class to parse the metrics from the logs. Args: parse_line (ParseLineFunctionType): Function to parse a line in the log file. The function should return None if the line is not a valid log statement (eg error messages). Defaults to parse_json_and_match_value. """ super().__init__(parse_line) self.log_type = "metric"
[docs] def parse_as_df( self, filepath_pattern: str, group_metrics: Callable[ [list[LogType]], dict[str, list[LogType]] ] = group_metrics, aggregate_metrics: Callable[[list[LogType]], list[LogType]] = aggregate_metrics, ) -> dict[str, pd.DataFrame]: """Create a dict of (metric_name, dataframe). Method that: (i) reads metrics from the filesystem (ii) groups metrics (iii) aggregates all the metrics within a group, (iv) converts the aggregate metrics into dataframes and returns a \ dictionary of dataframes Args: filepath_pattern (str): filepath pattern to glob group_metrics (Callable[[list[LogType]], dict[str, list[LogType]]], optional): Function to group a list of metrics into a dictionary of (key, list of grouped metrics). Defaults to group_metrics. aggregate_metrics (Callable[[list[LogType]], list[LogType]], optional): Function to aggregate a list of metrics. Defaults to aggregate_metrics. """ metric_logs = list(self.parse(filepath_pattern)) return metrics_to_df( metric_logs=metric_logs, group_metrics=group_metrics, aggregate_metrics=aggregate_metrics, )
[docs]def metrics_to_df( metric_logs: list[LogType], group_metrics: Callable[[list[LogType]], dict[str, list[LogType]]] = group_metrics, aggregate_metrics: Callable[[list[LogType]], list[LogType]] = aggregate_metrics, ) -> dict[str, pd.DataFrame]: """Create a dict of (metric_name, dataframe). Method that: (i) groups metrics (ii) aggregates all the metrics within a group, (iii) converts the aggregate metrics into dataframes and returns a \ dictionary of dataframes Args: metric_logs (list[LogType]): list of metrics group_metrics (Callable[[list[LogType]], dict[str, list[LogType]]], optional): Function to group a list of metrics into a dictionary of (key, list of grouped metrics). Defaults to group_metrics. aggregate_metrics (Callable[[list[LogType]], list[LogType]], optional): Function to aggregate a list of metrics. Defaults to aggregate_metrics. Returns: dict[str, pd.DataFrame]: [description] """ grouped_metrics: dict[str, list[LogType]] = group_metrics(metric_logs) aggregated_metrics = { key: aggregate_metrics(metrics) for key, metrics in grouped_metrics.items() } metric_dfs = { key: pd.json_normalize(data=metrics) for key, metrics in aggregated_metrics.items() } return metric_dfs