MLflow and Prometheus Integration for Deep Insights and Continuous Improvement
Machine Learning and Generative AI solutions are as efficient and promising as the underlying setup, mathematical algorithms, and data. This means extensive analysis, experimentation, engineering, validation, and testing are required to develop ML services. Multiple factors vary between developing solutions and deployed products. Productionized services require completely different checks to understand the gaps. The focus of this article is to present how you can analyze ML model performance and inference statistics in production environments.
Ensuring Reliable ML Solutions
When developing ML solutions, data scientists and machine learning practitioners focus mostly on inference and accuracy. A common pattern development teams follow is experimenting with multiple algorithms while fine-tuning hyperparameters. However, the probability of these models performing as they were during development is slim.
A magnitude of outliers influences the reliability and performance of production models. We need a sound approach built on open-source services to ensure ML and LLM solutions continuously deliver value with 99% uptime.
Enhancing ML Model Performance
An underperforming ML model with poor inference and accuracy isn’t always due to poor data quality or inefficient model training. It could stem from issues with the model itself or the infrastructure. Understanding model metrics can reveal whether predictions are relevant and accurate. To improve model performance over time:
- Log predictive statistics at random intervals to analyze trends and model behavior.
- Capture and analyze all triggered events within the execution workflow for comprehensive insights.
- Use MLFlow for model logging and Prometheus for infrastructure logging to identify and address issues.
- Automate analysis with Prometheus Alertmanager to detect anomalies in real-time and notify teams for quick remediation actions.
Implementation of MLFlow and Prometheus
The complete MLFlow and Prometheus integration explanation is difficult to curate within a single post. Let us assume our MLFlow and Prometheus setup are in place with AWS being our cloud provider. The implementation will have the following considerations:
- Leverage pre-trained models: We will not train an ML model from scratch and evaluate it. Assuming we have a pre-trained ML model at our disposal, we will logically wrap the model and attempt to capture model metrics and evaluate model performance and accuracy.
- Use Prometheus for insights: We will access the Prometheus client to understand what is happening under the hood, how the infrastructure and model behavior correlate, and much more through metrics.
- Set up alerts: Finally, we will set up alerts to notify business and development teams when the model drifts or underperforms.
Step 1. Capturing Model Metrics
Capturing model behavior in production environments can be an overhead. Considering the scale at which the application is set and the number of incoming requests, careful measures must be taken to log the information. For example, if we attempt to log every request made to ChatGPT, our blob storage will overflow with information implying storage costs.
Let us consider our model generally has a threshold of 10 seconds to predict or generate the output. It would be ideal to capture predictions that took more than 10 seconds to complete. For every prediction that exceeds the default threshold, we will capture the prediction_time, prediction accuracy, loss, and a few other metrics, and log them into the MLFlow tracking server for reference.
import mlflow
import mlflow.pyfunc
import time
import json
import boto3
from datetime import datetime
from sklearn.metrics import accuracy_score, mean_squared_error
s3_client = boto3.client('s3')
model = "prometheus-eval/prometheus-7b-v2.0.pkl"
S3_BUCKET_NAME = "s3:/ML_historical_metrics"
S3_FILE_PREFIX = "metrics"
def model_predict_and_log_to_s3(model, input_data, S3_BUCKET_NAME, S3_BUCKET_NAME):
log_data = {}
with mlflow.start_run():
start_time = time.time()
prediction = model.predict(input_data)
prediction_time = time.time() - start_time
if prediction_time > 10:
accuracy = accuracy_score(true_labels, prediction)
loss = mean_squared_error(true_labels, prediction)
log_data["input_data"] = input_data
log_data["prediction"] = prediction.tolist()
log_data["accuracy"] = accuracy
log_data["loss"] = loss
log_data["prediction_time"] = prediction_time
mlflow.log_param("input_data", input_data)
mlflow.log_param("prediction", prediction)
mlflow.log_metric("prediction_time", prediction_time)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("loss", loss)
log_json = json.dumps(log_data, default=str)
today_date = datetime.utcnow().strftime('%Y-%m-%d')
s3_filename = f"{S3_BUCKET_NAME}/{today_date}/model_log.json"
s3_client.put_object(Body=log_json, Bucket=bucket_name, Key=s3_filename)
prometheus_capture_stats()
return prediction
We will partition using date and capture this information into log groups for easy retrieval and cleanup. Now let us capture logs from computing resources and how they react to predictions.
Step 2. Capturing Resource Utilization Stats
Comparing and analyzing the model behavior against resource utilization requires capturing resource statistics at a granular level. POC to enterprise-grade applications relies on certain cloud resources to fulfill the tasks. Let us concentrate on the most common resources such as CPU, memory, network I/O, read and write, etc. We will capture the metrics using Prometheus for every model run that took more than 10 seconds.
import requests
import json
import boto3
from datetime import datetime
PROMETHEUS_URL = "http://<prometheus-server-url>:9090/api/v1/query"
def query_prometheus(query):
response = requests.get(PROMETHEUS_URL, params={'query': query})
return response.json()
queries = {
"cpu_usage": 'avg(rate(node_cpu_seconds_total{mode="idle"}[1m])) by (instance)',
"memory_usage": '100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)',
"disk_read_io": 'rate(node_disk_read_bytes_total[1m])',
"disk_write_io": 'rate(node_disk_written_bytes_total[1m])',
"network_receive_io": 'rate(node_network_receive_bytes_total[1m])',
"network_transmit_io": 'rate(node_network_transmit_bytes_total[1m])',
"system_load": 'node_load1',
"http_latency": 'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
}
def prometheus_capture_stats():
all_metrics = {}
for metric_name, query in queries.items():
result = query_prometheus(query)
all_metrics[metric_name] = result['data']['result']
# Prepare the JSON data with the current date
today_date = datetime.utcnow().strftime('%Y-%m-%d')
json_data = {
"date": today_date,
"metrics": all_metrics
}
# Convert to JSON string
json_str = json.dumps(json_data, indent=4)
# S3 file path with today's date
s3_file_path = f"{S3_FILE_PREFIX}/{today_date}/prom_stats.json"
s3_client.put_object(Body=json_str, Bucket=S3_BUCKET_NAME, Key=s3_file_path)
print(f"Metrics have been saved to {s3_file_path} in S3.")
Our JSON will look like the below example, using these metrics to apply custom logic to understand the gaps and impacts in the workflow.
{
"date": "2024-09-02",
"metrics": {
"cpu_usage": 0.75,
"memory_usage": 80,
"disk_read_io": 1500,
"disk_write_io": 800,
"network_receive_io": 1000,
"network_transmit_io": 900,
"system_load": 1.5,
"http_latency": 0.2
}
}
We can label the peak usage times metrics into log groups along with usage metrics, which can give an edge in reviewing and understanding peak time workflow execution. The data can help in forecasting the demand. Also, by using this information, the infrastructure team can prepare the backends to scale accordingly.
Step 3. Correlating Model and Resource Metrics
With metrics in place, we can run scheduled checks to analyze the stats and implement remediations. We will set manual threshold values for all the resources we are interested in. Parallelly we will read the Prometheus and model metrics from S3 and compare them against the thresholds. Based on the condition and severity, we will take necessary actions such as altering teams using the Prometheus alert manager.
import json
ALERTMANAGER_URL = "http://<alertmanager-url>:9093/api/v1/alerts"
PREDICTION_DATA_KEY= f"{S3_BUCKET_NAME}/{today_date}/model_log.json"
METRICS_DATA_KEY= f"{S3_BUCKET_NAME}/{today_date}/prom_stats.json"
def download_json_from_s3(bucket_name, key):
obj = s3_client.get_object(Bucket=bucket_name, Key=key)
data = json.loads(obj['Body'].read().decode('utf-8'))
return data
def send_alert(impacted_metrics):
alert_payload = [{
"labels": {
"alertname": "PerformanceIssue",
"severity": "critical"
},
"annotations": {
"summary": "Performance degradation detected",
"description": "The following metrics are impacting performance:\n" + "\n".join([
f"{metric['metric']}: {metric['value']} (Threshold: {metric['threshold']})"
for metric in impacted_metrics
])
}
}]
response = requests.post(ALERTMANAGER_URL, json=alert_payload)
if response.status_code == 200:
print("Alert sent successfully.")
else:
print(f"Failed to send alert. Status code: {response.status_code}")
thresholds = {
"cpu_usage": 0.7,
"memory_usage": 75,
"disk_read_io": 1200,
"disk_write_io": 1000,
"network_receive_io": 800,
"network_transmit_io": 800,
"system_load": 1.0,
"http_latency": 0.3
}
prediction_data = download_json_from_s3(S3_BUCKET_NAME, PREDICTION_DATA_KEY)
metrics_data = download_json_from_s3(S3_BUCKET_NAME, METRICS_DATA_KEY)
if prediction_data['prediction_time'] > 10:
print("Prediction time exceeded 10 seconds. Reviewing system metrics...")
impacted_metrics = []
for metric, value in metrics_data['metrics'].items():
if value > thresholds[metric]:
impacted_metrics.append({
"metric": metric,
"value": value,
"threshold": thresholds[metric],
"impact": "high"
})
if impacted_metrics:
print("The following metrics are continuously impacting the overall performance:")
for impact in impacted_metrics:
print(f"- {impact['metric']}: {impact['value']} (Threshold: {impact['threshold']}) - Impact: {impact['impact']}")
send_alert(impacted_metrics)
else:
print("No metrics were found to be impacting performance based on the thresholds.")
else:
print("Prediction time is within acceptable limits. No further analysis required.")
We are checking the metrics to filter which events are repeatedly occurring. By looking at the numbers, we can easily understand which resource is causing the exhaustion or the gaps in the workflow.
Conclusion
Stagnation is the curse of modern enterprises. Applications and software, most importantly in the AI domain, need to evolve continuously. Delivering value at scale is the goal many enterprises aim to achieve, and despite following all industry best practices, external anomalies can lead to degraded performance. Businesses require data points to analyze the gaps and resolve them accordingly. For ML applications, integration of MLOps and Prometheus can deliver an automated and sound solution to capture, analyze service workflow, and implement automated fixes based on the patterns.