diff --git a/.gitignore b/.gitignore index 4d9702cc6..1d331c5c1 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,9 @@ RAG/notebooks/langchain/data/save_embedding # IntelliJ's project specific settings file .idea + +# Environment variables +.env + +# egg-info directories +**/egg-info diff --git a/industries/manufacturing/predictive_maintenance_agent/README.md b/industries/manufacturing/predictive_maintenance_agent/README.md new file mode 100644 index 000000000..dea59b614 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/README.md @@ -0,0 +1,182 @@ +# Predictive Maintenance Agent + +A comprehensive AI-powered predictive maintenance system built with NVIDIA AIQ Toolkit for turbofan engine health monitoring and failure prediction. + +Work done by: Vineeth Kalluru, Janaki Vamaraju, Sugandha Sharma, Ze Yang and Viraj Modak + +## Overview + +Predictive maintenance prevents costly downtime by identifying potential failures before they occur. This agent leverages AI to analyze sensor data from turbofan engines, predict remaining useful life (RUL), and provide actionable insights for maintenance teams. + +### Key Benefits +- **Prevent Costly Downtime**: Identify failures before they occur +- **Optimize Maintenance**: Perform maintenance only when needed +- **Extend Equipment Life**: Monitor health to maximize efficiency +- **Improve Safety**: Prevent catastrophic failures +- **Reduce Costs**: Minimize emergency repairs and disruptions + +## Dataset + +Uses the **NASA Turbofan Engine Degradation Simulation Dataset (C-MAPSS)** with: +- **21 Sensor Measurements**: Temperature, pressure, vibration, and flow +- **3 Operational Settings**: Different flight conditions +- **Multiple Engine Units**: Each with unique degradation patterns +- **Run-to-Failure Data**: Complete lifecycle from healthy operation to failure + +## Architecture + +Multi-agent architecture with: +- **React Agent Workflow**: Main orchestration using ReAct pattern +- **SQL Retriever Tool**: Generates SQL queries using NIM LLM +- **RUL Prediction Tool**: XGBoost model for remaining useful life prediction +- **Plotting Agent**: Multi-tool agent for data visualization +- **Vector Database**: ChromaDB for schema information storage + +#### Agentic workflow architecture diagram +![Agentic workflow](imgs/pred_maint_arch_diagram_img1.png) + +#### Agentic workflow architecture diagram w/ reasoning +![Agentic workflow w/ reasoning](imgs/pred_maint_arch_diagram_img2.png) + +## Setup and Installation + +### Prerequisites +- Python 3.11+ (< 3.13) +- Conda or Miniconda +- NVIDIA NIM API access +- Node.js v18+ (for web interface) + +### 1. Create Conda Environment + +```bash +conda create -n pdm python=3.11 +conda activate pdm +``` + +### 2. Install NVIDIA AIQ Toolkit + +```bash +git clone https://github.com/NVIDIA/AIQToolkit.git +cd AIQToolkit +uv pip install -e . +aiq --help + +# Optional: Remove cloned repo after installation +# cd .. && rm -rf AIQToolkit +``` + +### 3. Install Predictive Maintenance Agent + +```bash +cd .. +git clone https://github.com/NVIDIA/GenerativeAIExamples.git +cd GenerativeAIExamples/industries/manufacturing/predictive_maintenance_agent +uv pip install -e . +``` + +### 4. Environment Setup + +```bash +export NVIDIA_API_KEY=your_nvidia_api_key_here +``` + +### 5. Database Setup + +1. Download [NASA Turbofan Dataset](https://ti.arc.nasa.gov/tech/dash/groups/pcoe/prognostic-data-repository/) +2. Extract files to `data/` directory +3. Run setup script: +```bash +python setup_database.py +``` + +### 6. Configure Paths + +Update `configs/config.yml` with your local paths for database, models, and output directories. + +## Launch Server and UI + +### Start AIQ Server +```bash +aiq serve --config_file=configs/config.yml +``` +Server runs on `http://localhost:8000` + +Note: When using the provided config file, you need to set the PWD_PATH environment variable before starting the AIQ server. This ensures the server can locate all required paths correctly. + +Here's how to do it: + +```bash +export PWD_PATH=$(pwd) +aiq serve --config_file=configs/config.yml "$@" +``` +(or) +```bash +export PWD_PATH=$(pwd) +aiq serve --config_file=configs/config-reasoning.yml "$@" +``` + +### Setup Web Interface + +```bash +git clone https://github.com/NVIDIA/AIQToolkit-UI.git +cd AIQToolkit-UI +npm ci +npm run dev +``` +UI available at `http://localhost:3000` + +**Configure UI Settings:** +- Click Settings icon (bottom left) +- Set HTTP URL to `/chat/stream` (recommended) +- Configure theme and WebSocket URL as needed + +## Example Prompts + +Test the system with these prompts: + +**Data Retrieval:** +``` +Retrieve the time in cycles and operational setting 1 from the FD001 test table for unit number 1 and plot its value vs time. +``` + +![Data Retrieval Example](imgs/test_prompt_1.png) + +**Visualization:** +``` +Retrieve real RUL of each unit in the FD001 test dataset. Then plot a distribution of it. +``` + +![Visualization Example](imgs/test_prompt_2.png) + +**Prediction** +``` +Retrieve time in cycles, all sensor measurements and RUL value for engine unit 24 from FD001 test and RUL tables. Predict RUL for it. Finally, generate a plot to compare actual RUL value with predicted RUL value across time. +``` + +![Prediction Example](imgs/test_prompt_3.png) + +## Observability (Optional) + +Monitor your system with Phoenix: + +```bash +# Docker (recommended) +docker run -p 6006:6006 -p 4317:4317 arizephoenix/phoenix:latest + +# Or install as package +uv pip install arize-phoenix +phoenix serve +``` + +Access dashboard at `http://localhost:6006` to monitor traces, performance, and costs. + +## Next Steps + +The agent provides a foundation for industrial AI applications. Planned enhancements include: memory layer for context retention, parallel tool execution for faster responses, action recommendation reasoning agent, real-time fault detection, and integration with NVIDIA's NV-Tesseract foundation models for improved accuracy. + +--- + +**Resources:** +- [NVIDIA AIQ Toolkit Documentation](https://docs.nvidia.com/aiq-toolkit/) +- [Phoenix Observability](https://phoenix.arize.com/) +- [NV-Tesseract Models](https://developer.nvidia.com/blog/new-nvidia-nv-tesseract-time-series-models-advance-dataset-processing-and-anomaly-detection/) \ No newline at end of file diff --git a/industries/manufacturing/predictive_maintenance_agent/configs/config.yml b/industries/manufacturing/predictive_maintenance_agent/configs/config.yml new file mode 120000 index 000000000..113a15bec --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/configs/config.yml @@ -0,0 +1 @@ +/Users/vikalluru/Documents/GenerativeAIExamples/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config.yml \ No newline at end of file diff --git a/industries/manufacturing/predictive_maintenance_agent/imgs/intermediate_steps.png b/industries/manufacturing/predictive_maintenance_agent/imgs/intermediate_steps.png new file mode 100644 index 000000000..b0858ae6a Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/imgs/intermediate_steps.png differ diff --git a/industries/manufacturing/predictive_maintenance_agent/imgs/pred_maint_arch_diagram_img1.png b/industries/manufacturing/predictive_maintenance_agent/imgs/pred_maint_arch_diagram_img1.png new file mode 100644 index 000000000..99d5293a8 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/imgs/pred_maint_arch_diagram_img1.png differ diff --git a/industries/manufacturing/predictive_maintenance_agent/imgs/pred_maint_arch_diagram_img2.png b/industries/manufacturing/predictive_maintenance_agent/imgs/pred_maint_arch_diagram_img2.png new file mode 100644 index 000000000..714fca155 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/imgs/pred_maint_arch_diagram_img2.png differ diff --git a/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_1.png b/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_1.png new file mode 100644 index 000000000..06120de71 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_1.png differ diff --git a/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_2.png b/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_2.png new file mode 100644 index 000000000..83aa83870 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_2.png differ diff --git a/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_3.png b/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_3.png new file mode 100644 index 000000000..e1fa8fb50 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/imgs/test_prompt_3.png differ diff --git a/industries/manufacturing/predictive_maintenance_agent/models/scaler_model.pkl b/industries/manufacturing/predictive_maintenance_agent/models/scaler_model.pkl new file mode 100644 index 000000000..00e6785f8 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/models/scaler_model.pkl differ diff --git a/industries/manufacturing/predictive_maintenance_agent/models/xgb_model_fd001.pkl b/industries/manufacturing/predictive_maintenance_agent/models/xgb_model_fd001.pkl new file mode 100644 index 000000000..9996ec935 Binary files /dev/null and b/industries/manufacturing/predictive_maintenance_agent/models/xgb_model_fd001.pkl differ diff --git a/industries/manufacturing/predictive_maintenance_agent/pyproject.toml b/industries/manufacturing/predictive_maintenance_agent/pyproject.toml new file mode 100644 index 000000000..7efa09139 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools >= 64"] + +[project] +name = "predictive_maintenance_agent" +version = "0.1.0" +dependencies = [ + "aiqtoolkit[profiling, langchain, telemetry]", + "pydantic ~= 2.10.0, <2.11.0", +] +requires-python = ">=3.11,<3.13" +description = "Predictive maintenance workflow using AIQ" +classifiers = ["Programming Language :: Python"] +authors = [{ name = "Vineeth Kalluru" }] +maintainers = [{ name = "NVIDIA Corporation" }] + +[project.entry-points.'aiq.components'] +predictive_maintenance_agent = "predictive_maintenance_agent.register" diff --git a/industries/manufacturing/predictive_maintenance_agent/setup_database.py b/industries/manufacturing/predictive_maintenance_agent/setup_database.py new file mode 100644 index 000000000..4bf5ad222 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/setup_database.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +""" +NASA Turbofan Engine Dataset to SQLite Database Converter + +This script converts the NASA Turbofan Engine Degradation Simulation Dataset (C-MAPSS) +from text files into a structured SQLite database for use with the predictive maintenance agent. + +The NASA dataset contains: +- Training data: Engine run-to-failure trajectories +- Test data: Engine trajectories of unknown remaining cycles +- RUL data: Ground truth remaining useful life values + +Dataset structure: +- unit_number: Engine unit identifier +- time_in_cycles: Operational time cycles +- operational_setting_1, 2, 3: Operating conditions +- sensor_measurement_1 to 21: Sensor readings +""" + +import sqlite3 +import pandas as pd +import numpy as np +import os +from pathlib import Path +import logging + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class NASADatasetProcessor: + """Processes NASA Turbofan Engine Dataset and creates SQLite database.""" + + def __init__(self, data_dir: str = "data", db_path: str = "PredM_db/nasa_turbo.db"): + """ + Initialize the processor. + + Args: + data_dir: Directory containing NASA dataset text files + db_path: Path where SQLite database will be created + """ + self.data_dir = Path(data_dir) + self.db_path = Path(db_path) + + # Ensure database directory exists + self.db_path.parent.mkdir(exist_ok=True) + + # Define column names for the dataset + self.columns = [ + 'unit_number', 'time_in_cycles', + 'operational_setting_1', 'operational_setting_2', 'operational_setting_3', + 'sensor_measurement_1', 'sensor_measurement_2', 'sensor_measurement_3', + 'sensor_measurement_4', 'sensor_measurement_5', 'sensor_measurement_6', + 'sensor_measurement_7', 'sensor_measurement_8', 'sensor_measurement_9', + 'sensor_measurement_10', 'sensor_measurement_11', 'sensor_measurement_12', + 'sensor_measurement_13', 'sensor_measurement_14', 'sensor_measurement_15', + 'sensor_measurement_16', 'sensor_measurement_17', 'sensor_measurement_18', + 'sensor_measurement_19', 'sensor_measurement_20', 'sensor_measurement_21' + ] + + # Sensor descriptions for metadata + self.sensor_descriptions = { + 'sensor_measurement_1': 'Total temperature at fan inlet (°R)', + 'sensor_measurement_2': 'Total temperature at LPC outlet (°R)', + 'sensor_measurement_3': 'Total temperature at HPC outlet (°R)', + 'sensor_measurement_4': 'Total temperature at LPT outlet (°R)', + 'sensor_measurement_5': 'Pressure at fan inlet (psia)', + 'sensor_measurement_6': 'Total pressure in bypass-duct (psia)', + 'sensor_measurement_7': 'Total pressure at HPC outlet (psia)', + 'sensor_measurement_8': 'Physical fan speed (rpm)', + 'sensor_measurement_9': 'Physical core speed (rpm)', + 'sensor_measurement_10': 'Engine pressure ratio (P50/P2)', + 'sensor_measurement_11': 'Static pressure at HPC outlet (psia)', + 'sensor_measurement_12': 'Ratio of fuel flow to Ps30 (pps/psi)', + 'sensor_measurement_13': 'Corrected fan speed (rpm)', + 'sensor_measurement_14': 'Corrected core speed (rpm)', + 'sensor_measurement_15': 'Bypass Ratio', + 'sensor_measurement_16': 'Burner fuel-air ratio', + 'sensor_measurement_17': 'Bleed Enthalpy', + 'sensor_measurement_18': 'Required fan speed', + 'sensor_measurement_19': 'Required fan conversion speed', + 'sensor_measurement_20': 'High-pressure turbines Cool air flow', + 'sensor_measurement_21': 'Low-pressure turbines Cool air flow' + } + + def read_data_file(self, file_path: Path) -> pd.DataFrame: + """ + Read a NASA dataset text file and return as DataFrame. + + Args: + file_path: Path to the text file + + Returns: + DataFrame with proper column names + """ + try: + # Read space-separated text file + df = pd.read_csv(file_path, sep='\s+', header=None, names=self.columns) + logger.info(f"Loaded {len(df)} records from {file_path.name}") + return df + except Exception as e: + logger.error(f"Error reading {file_path}: {e}") + return pd.DataFrame() + + def process_training_data(self, conn: sqlite3.Connection): + """Process training data files and create database tables.""" + logger.info("Processing training data...") + + training_files = [ + 'train_FD001.txt', 'train_FD002.txt', 'train_FD003.txt', 'train_FD004.txt' + ] + + all_training_data = [] + + for file_name in training_files: + file_path = self.data_dir / file_name + if file_path.exists(): + df = self.read_data_file(file_path) + if not df.empty: + # Add dataset identifier + df['dataset'] = file_name.replace('train_', '').replace('.txt', '') + + # Calculate RUL for training data (max cycle - current cycle) + df['RUL'] = df.groupby('unit_number')['time_in_cycles'].transform('max') - df['time_in_cycles'] + + all_training_data.append(df) + else: + logger.warning(f"Training file not found: {file_path}") + + if all_training_data: + training_df = pd.concat(all_training_data, ignore_index=True) + training_df.to_sql('training_data', conn, if_exists='replace', index=False) + logger.info(f"Created training_data table with {len(training_df)} records") + + def process_test_data(self, conn: sqlite3.Connection): + """Process test data files and create database tables.""" + logger.info("Processing test data...") + + test_files = [ + 'test_FD001.txt', 'test_FD002.txt', 'test_FD003.txt', 'test_FD004.txt' + ] + + all_test_data = [] + + for file_name in test_files: + file_path = self.data_dir / file_name + if file_path.exists(): + df = self.read_data_file(file_path) + if not df.empty: + # Add dataset identifier + df['dataset'] = file_name.replace('test_', '').replace('.txt', '') + all_test_data.append(df) + else: + logger.warning(f"Test file not found: {file_path}") + + if all_test_data: + test_df = pd.concat(all_test_data, ignore_index=True) + test_df.to_sql('test_data', conn, if_exists='replace', index=False) + logger.info(f"Created test_data table with {len(test_df)} records") + + def process_rul_data(self, conn: sqlite3.Connection): + """Process RUL (Remaining Useful Life) data files.""" + logger.info("Processing RUL data...") + + rul_files = [ + 'RUL_FD001.txt', 'RUL_FD002.txt', 'RUL_FD003.txt', 'RUL_FD004.txt' + ] + + all_rul_data = [] + + for file_name in rul_files: + file_path = self.data_dir / file_name + if file_path.exists(): + try: + # RUL files contain one RUL value per line for each test engine + rul_values = pd.read_csv(file_path, header=None, names=['RUL']) + rul_values['unit_number'] = range(1, len(rul_values) + 1) + rul_values['dataset'] = file_name.replace('RUL_', '').replace('.txt', '') + all_rul_data.append(rul_values[['unit_number', 'dataset', 'RUL']]) + logger.info(f"Loaded {len(rul_values)} RUL values from {file_name}") + except Exception as e: + logger.error(f"Error reading RUL file {file_path}: {e}") + else: + logger.warning(f"RUL file not found: {file_path}") + + if all_rul_data: + rul_df = pd.concat(all_rul_data, ignore_index=True) + rul_df.to_sql('rul_data', conn, if_exists='replace', index=False) + logger.info(f"Created rul_data table with {len(rul_df)} records") + + def create_metadata_tables(self, conn: sqlite3.Connection): + """Create metadata tables with sensor descriptions and dataset information.""" + logger.info("Creating metadata tables...") + + # Sensor metadata + sensor_metadata = pd.DataFrame([ + {'sensor_name': sensor, 'description': desc} + for sensor, desc in self.sensor_descriptions.items() + ]) + sensor_metadata.to_sql('sensor_metadata', conn, if_exists='replace', index=False) + + # Dataset metadata + dataset_metadata = pd.DataFrame([ + {'dataset': 'FD001', 'description': 'Sea level conditions', 'fault_modes': 1}, + {'dataset': 'FD002', 'description': 'Sea level conditions', 'fault_modes': 6}, + {'dataset': 'FD003', 'description': 'High altitude conditions', 'fault_modes': 1}, + {'dataset': 'FD004', 'description': 'High altitude conditions', 'fault_modes': 6} + ]) + dataset_metadata.to_sql('dataset_metadata', conn, if_exists='replace', index=False) + + logger.info("Created metadata tables") + + def create_indexes(self, conn: sqlite3.Connection): + """Create database indexes for better query performance.""" + logger.info("Creating database indexes...") + + indexes = [ + "CREATE INDEX IF NOT EXISTS idx_training_unit ON training_data(unit_number)", + "CREATE INDEX IF NOT EXISTS idx_training_dataset ON training_data(dataset)", + "CREATE INDEX IF NOT EXISTS idx_training_cycle ON training_data(time_in_cycles)", + "CREATE INDEX IF NOT EXISTS idx_test_unit ON test_data(unit_number)", + "CREATE INDEX IF NOT EXISTS idx_test_dataset ON test_data(dataset)", + "CREATE INDEX IF NOT EXISTS idx_test_cycle ON test_data(time_in_cycles)", + "CREATE INDEX IF NOT EXISTS idx_rul_unit ON rul_data(unit_number, dataset)" + ] + + for index_sql in indexes: + conn.execute(index_sql) + + conn.commit() + logger.info("Created database indexes") + + def create_views(self, conn: sqlite3.Connection): + """Create useful database views for common queries.""" + logger.info("Creating database views...") + + # View for latest sensor readings per engine + latest_readings_view = """ + CREATE VIEW IF NOT EXISTS latest_sensor_readings AS + SELECT t1.* + FROM training_data t1 + INNER JOIN ( + SELECT unit_number, dataset, MAX(time_in_cycles) as max_cycle + FROM training_data + GROUP BY unit_number, dataset + ) t2 ON t1.unit_number = t2.unit_number + AND t1.dataset = t2.dataset + AND t1.time_in_cycles = t2.max_cycle + """ + + # View for engine health summary + engine_health_view = """ + CREATE VIEW IF NOT EXISTS engine_health_summary AS + SELECT + unit_number, + dataset, + MAX(time_in_cycles) as total_cycles, + MIN(RUL) as final_rul, + AVG(sensor_measurement_1) as avg_fan_inlet_temp, + AVG(sensor_measurement_11) as avg_hpc_outlet_pressure, + AVG(sensor_measurement_21) as avg_lpt_cool_air_flow + FROM training_data + GROUP BY unit_number, dataset + """ + + conn.execute(latest_readings_view) + conn.execute(engine_health_view) + conn.commit() + logger.info("Created database views") + + def validate_database(self, conn: sqlite3.Connection): + """Validate the created database by running sample queries.""" + logger.info("Validating database...") + + validation_queries = [ + ("Training data count", "SELECT COUNT(*) FROM training_data"), + ("Test data count", "SELECT COUNT(*) FROM test_data"), + ("RUL data count", "SELECT COUNT(*) FROM rul_data"), + ("Unique engines in training", "SELECT COUNT(DISTINCT unit_number) FROM training_data"), + ("Datasets available", "SELECT DISTINCT dataset FROM training_data"), + ] + + for description, query in validation_queries: + try: + result = conn.execute(query).fetchone() + logger.info(f"{description}: {result[0] if isinstance(result[0], (int, float)) else result}") + except Exception as e: + logger.error(f"Validation query failed - {description}: {e}") + + def process_dataset(self): + """Main method to process the entire NASA dataset.""" + logger.info(f"Starting NASA dataset processing...") + logger.info(f"Data directory: {self.data_dir.absolute()}") + logger.info(f"Database path: {self.db_path.absolute()}") + + # Check if data directory exists + if not self.data_dir.exists(): + logger.error(f"Data directory not found: {self.data_dir}") + logger.info("Please download the NASA Turbofan Engine Degradation Simulation Dataset") + logger.info("and place the text files in the 'data' directory") + return False + + try: + # Connect to SQLite database + with sqlite3.connect(self.db_path) as conn: + logger.info(f"Connected to database: {self.db_path}") + + # Process all data files + self.process_training_data(conn) + self.process_test_data(conn) + self.process_rul_data(conn) + self.create_metadata_tables(conn) + self.create_indexes(conn) + self.create_views(conn) + + # Validate the database + self.validate_database(conn) + + logger.info("Database processing completed successfully!") + return True + + except Exception as e: + logger.error(f"Error processing database: {e}") + return False + +def main(): + """Main function to run the database setup.""" + import argparse + + parser = argparse.ArgumentParser(description="Convert NASA Turbofan Dataset to SQLite") + parser.add_argument("--data-dir", default="data", + help="Directory containing NASA dataset text files") + parser.add_argument("--db-path", default="PredM_db/nasa_turbo.db", + help="Path for output SQLite database") + + args = parser.parse_args() + + processor = NASADatasetProcessor(args.data_dir, args.db_path) + success = processor.process_dataset() + + if success: + print(f"\n✅ Database created successfully at: {args.db_path}") + print("\nDatabase contains the following tables:") + print("- training_data: Engine run-to-failure trajectories") + print("- test_data: Engine test trajectories") + print("- rul_data: Ground truth RUL values") + print("- sensor_metadata: Sensor descriptions") + print("- dataset_metadata: Dataset information") + print("\nUseful views created:") + print("- latest_sensor_readings: Latest readings per engine") + print("- engine_health_summary: Engine health statistics") + else: + print("\n❌ Database creation failed. Check the logs above.") + return 1 + + return 0 + +if __name__ == "__main__": + exit(main()) \ No newline at end of file diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py @@ -0,0 +1 @@ + diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config-reasoning.yml b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config-reasoning.yml new file mode 100644 index 000000000..7086b9dd1 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config-reasoning.yml @@ -0,0 +1,207 @@ +general: + use_uvloop: true + telemetry: + logging: + console: + _type: console + level: DEBUG + tracing: + phoenix: + _type: phoenix + endpoint: http://localhost:6006/v1/traces + project: predictive-maintenance-app + +llms: + sql_llm: + _type: nim + model_name: "meta/llama-4-scout-17b-16e-instruct" + coding_llm: + _type: nim + model_name: "qwen/qwen2.5-coder-32b-instruct" + max_tokens: 2000 + reasoning_llm: + _type: nim + model_name: "qwen/qwq-32b" + +embedders: + vanna_embedder: + _type: nim + model_name: "nvidia/nv-embed-v1" + +functions: + sql_retriever: + _type: generate_sql_query_and_retrieve_tool + llm_name: sql_llm + embedding_name: vanna_embedder + vector_store_path: "${PWD_PATH}/database" + db_path: "${PWD_PATH}/database/nasa_turbo.db" + output_folder: "${PWD_PATH}/output_data" + predict_rul: + _type: predict_rul_tool + output_folder: "${PWD_PATH}/output_data" + scaler_path: "${PWD_PATH}/models/scaler_model.pkl" + model_path: "${PWD_PATH}/models/xgb_model_fd001.pkl" + code_execution: + _type: code_execution + uri: http://127.0.0.1:6000/execute + sandbox_type: local + max_output_characters: 2000 + data_analysis_assistant: + _type: react_agent + llm_name: coding_llm + max_iterations: 5 + tool_names: [sql_retriever, code_execution, predict_rul] + system_prompt: | + You are a helpful data analysis assistant that can help with predictive maintenance tasks for a turbofan engine. You will work with planning agent + that provides a plan to you which you should follow: + You can use the following tools to help with your task: + {tools} + + Note: Your output_data folder is in "${PWD_PATH}/output_data" path. + However, the code execution sandbox runs with /workspace as the working directory (mounted to your local output_data folder) + So, when you are using the code execution tool, you should use relative paths starting with './' for file operations. + For example, if you want to read a file from the output_data folder, you should use './filename.json' as the path. + + But when passing any generated JSON file to other tools, you should use the absolute path to the file. + For example, if you want to pass the file 'engine_unit_24_sensor_data.json' to the predict_rul tool, you should use the absolute path to the file. + which is "${PWD_PATH}/output_data/engine_unit_24_sensor_data.json" + + **EXAMPLE CODE STRUCTURE:** + ### START PYTHON CODE ### + import pandas as pd + import plotly.graph_objects as go + + # Load data using relative path (working directory is /workspace mounted to your output_data) + data = pd.read_json('your_input_file.json') + + # Create your analysis/plot + fig = go.Figure(data=[go.Scatter(x=data['time_in_cycles'], y=data['sensor_measurement_10'])]) + fig.update_layout(title='Your Plot Title') + + # Save to current directory (will appear in your local output_data folder) + fig.write_html('your_output_file.html') + print(f"Plot saved to: your_output_file.html") + ### END PYTHON CODE ### + + # File Handling and Tool Usage Guidelines + # -------------------------------- + # 1. HTML File Paths + # - All HTML files from code execution are saved in the output_data directory + # - Always include the full path when referencing HTML files to users + # - Example: "${PWD_PATH}/output_data/plot_name.html" + + # 2. SQL Query Policy + # - NEVER generate SQL queries manually + # - ALWAYS use the provided SQL retrieval tool + + # 3. Typical Workflow + # a) Data Extraction + # - Use SQL retrieval tool to fetch required data + # b) Data Processing + # - Generate Python code for analysis/visualization + # - Execute code using code execution tool + # - Save results in output_data directory + # c) Result Handling + # - Return processed information to calling agent + # - DO NOT USE MARKDOWN FORMATTING IN YOUR RESPONSE. + # - If the code execution tool responds with a warning in the stderr then ignore it and take action based on the stdout. + + # 4. Visualization Guidelines + # - Use plotly.js for creating interactive plots + # - Save visualizations as HTML files + # - Store all plots in output_data directory + # - When comparing actual and predicted RUL columns, convert the actual RUL column to piecewise its piecewise RUL values before plotting. + Piecewise RUL instructions: + 1) Calculate the true failure point by taking the last cycle in your data and adding the final RUL value at that cycle (e.g., if last cycle is 100 with RUL=25, true failure is at cycle 125). + 2) Create the piecewise pattern where if the true failure cycle is greater than MAXLIFE (125), RUL stays flat at MAXLIFE until the "knee point" (true_failure - MAXLIFE), then declines linearly to zero; otherwise RUL just declines linearly from MAXLIFE. + 3) Generate RUL values for each cycle in your data using this pattern - flat section gets constant MAXLIFE value, declining section decreases by (MAXLIFE / remaining_cycles_to_failure) each step. + 4) Replace the actual RUL column in your dataset with these calculated piecewise values while keeping all other columns unchanged. + 5) The result is a "knee-shaped" RUL curve that better represents equipment degradation patterns - flat during early life, then linear decline toward failure. + + You may respond in one of two formats: + + Use the following format exactly when you want to use a tool: + + Question: the input question you must answer + Thought: you should always think about what to do + Action: the action to take, should be one of [{tool_names}] + Action Input: the input to the action (if there is no required input, include "Action Input: None") + Observation: wait for the tool to finish execution + + Use the following format exactly when you don't want to use a tool: + + Question: the input question you must answer + Thought: you should always think about what to do + Final Answer: the final answer to the original input question + + Use only the SQL retrieval tool for fetching data, do not generate code to do that. + +workflow: + _type: reasoning_agent + augmented_fn: data_analysis_assistant + llm_name: reasoning_llm + verbose: true + reasoning_prompt_template: | + You are a Data Analysis Reasoning and Planning Expert specialized in analyzing turbofan engine sensor data and predictive maintenance tasks. + You are tasked with creating detailed execution plans for addressing user queries while being conversational and helpful. + + **Your Role and Capabilities:** + - Expert in turbofan engine data analysis, predictive maintenance, and anomaly detection + - Create step-by-step execution plans using available tools + - Provide conversational responses while maintaining technical accuracy + - Only use tools when necessary to answer the user's question + + You are given a data analysis assistant to execute your plan, all you have to do is generate the plan. + DO NOT USE MARKDOWN FORMATTING IN YOUR RESPONSE. + + **Description:** + {augmented_function_desc} + + **Tools and description of the tool:** {tools} + + Guidelines: + 1. **Send the path to any HTML files generated to users** when tools return them (especially plotting results) + 2. **Only use tools if needed** - Not all queries require tool usage + + ---- + + Necessary Context: + You work with turbofan engine sensor data from multiple engines in a fleet. The data contains: + - **Time series data** from different engines, each with unique wear patterns and operational history separated into + four datasets (FD001, FD002, FD003, FD004), each dataset is further divided into training and test subsets. + - **26 data columns**: unit number, time in cycles, 3 operational settings, and 21 sensor measurements + - **Engine lifecycle**: Engines start operating normally, then develop faults that grow until system failure + - **Predictive maintenance goal**: Predict Remaining Useful Life (RUL) - how many operational cycles before failure + - **Data characteristics**: Contains normal operational variation, sensor noise, and progressive fault development + This context helps you understand user queries about engine health, sensor patterns, failure prediction, and maintenance planning. + + For Anomaly Detection Tasks: + When performing anomaly detection, follow this comprehensive approach: + + 1) First get the sensor measurement information for the same engine number from both training and test datasets across different cycle times and order + it in increasing order. + 2) Use the measurement from training data to calculate statistical baselines (mean, standard deviation, moving averages), because it represents what + normal operational behvaior looks like. + 3) Apply multiple statistical approaches to identify anomalies in test data: + - **Z-Score Analysis**: Compare test values against training data mean/std deviation using threshold (typically 3) + - **Moving Statistical Analysis**: Use rolling windows from training data to detect dynamic anomalies + - Flag data points that exceed statistical thresholds as potential anomalies + 4) Create comprehensive plots showing test data timeline with anomalies highlighted + - Use different colors/markers to distinguish between normal data and show all different types of anomalies + - Include hover information and legends for clear interpretation + - Save visualizations as interactive HTML files for detailed analysis + + ---- + + **User Input:** + {input_text} + + Analyze the input and create a comprehensive plan following this structure: + + Generate a plan that would look like this with numbered bullet points: + 1. Call tool A with input X + 2. Call tool B with input Y + 3. Interpret the output of tool A and B + 4. Return the final result + + **PLAN:** diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config.yml b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config.yml new file mode 100644 index 000000000..70b9cf40b --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/configs/config.yml @@ -0,0 +1,84 @@ +general: + use_uvloop: true + telemetry: + logging: + console: + _type: console + level: DEBUG + tracing: + phoenix: + _type: phoenix + endpoint: http://localhost:6006/v1/traces + project: predictive-maintenance-app + +llms: + sql_llm: + _type: nim + model_name: "meta/llama-4-scout-17b-16e-instruct" + plotting_llm: + _type: nim + model_name: "meta/llama-4-scout-17b-16e-instruct" + nim_llm: + _type: nim + model_name: "meta/llama-4-scout-17b-16e-instruct" + reasoning_llm: + _type: nim + model_name: "deepseek-ai/deepseek-r1-distill-qwen-32b" + +embedders: + vanna_embedder: + _type: nim + model_name: "nvidia/nv-embed-v1" + +functions: + sql_retriever: + _type: generate_sql_query_and_retrieve_tool + llm_name: sql_llm + embedding_name: vanna_embedder + vector_store_path: "${PWD_PATH}/database" + db_path: "${PWD_PATH}/database/nasa_turbo.db" + output_folder: "${PWD_PATH}/output_data" + plot_distribution: + _type: plot_distribution_tool + output_folder: "${PWD_PATH}/output_data" + plot_line_chart: + _type: plot_line_chart_tool + output_folder: "${PWD_PATH}/output_data" + plotting_agent: + _type: react_agent + llm_name: nim_llm + tool_names: [plot_line_chart, plot_distribution] + verbose: true + handle_tool_errors: true + description: "Use this agent to generate plots from the retrieved SQL data. Provide a description of the plot required along with the JSON file path containing the SQL output in the input message. + for example: Plot the sensor_measurement_1 vs time_in_cycles for unit 1 from the file ${PWD_PATH}/output_data/sql_output.json" + +workflow: + _type: react_agent + llm_name: nim_llm + tool_names: + - sql_retriever + - plotting_agent + verbose: true + system_prompt: | + You are a conversational helpful assistant that can help with predictive maintenance tasks for a turbofan engine. + Answer the user's question as best as you can while not doing more than what is asked. You can optionally use the following tools to help with your task: + {tools} + Not all queries require you to use the tools, only use them if you need to. + If a tool returns an HTML file, send it to the user. + DO NOT GENERATE SQL QUERIES BY YOURSELF, ONLY USE THE TOOLS!!! + You may respond in one of two formats: + + Use the following format exactly when you want to use a tool: + + Question: the input question you must answer + Thought: you should always think about what to do + Action: the action to take, should be one of [{tool_names}] + Action Input: the input to the action (if there is no required input, include "Action Input: None") + Observation: wait for the tool to finish execution + + Use the following format exactly when you don't want to use a tool: + + Question: the input question you must answer + Thought: you should always think about what to do + Final Answer: the final answer to the original input question diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/generate_sql_query_and_retrieve_tool.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/generate_sql_query_and_retrieve_tool.py new file mode 100644 index 000000000..6b9d10486 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/generate_sql_query_and_retrieve_tool.py @@ -0,0 +1,250 @@ +import json +import logging +import os + +from aiq.builder.framework_enum import LLMFrameworkEnum +from pydantic import Field, BaseModel + +from aiq.builder.builder import Builder +from aiq.builder.function_info import FunctionInfo +from aiq.cli.register_workflow import register_function +from aiq.data_models.function import FunctionBaseConfig + +logger = logging.getLogger(__name__) + +class GenerateSqlQueryAndRetrieveToolConfig(FunctionBaseConfig, name="generate_sql_query_and_retrieve_tool"): + """ + AIQ Toolkit function to generate SQL queries and retrieve data. + """ + # Runtime configuration parameters + llm_name: str = Field(description="The name of the LLM to use for the function.") + embedding_name: str = Field(description="The name of the embedding to use for the function.") + vector_store_path: str = Field(description="The path to the vector store to use for the function.") + db_path: str = Field(description="The path to the SQL database to use for the function.") + output_folder: str = Field(description="The path to the output folder to use for the function.") + +@register_function(config_type=GenerateSqlQueryAndRetrieveToolConfig) +async def generate_sql_query_and_retrieve_tool( + config: GenerateSqlQueryAndRetrieveToolConfig, builder: Builder +): + """ + Generate a SQL query for a given question and retrieve the data from the database. + """ + class GenerateSqlQueryInputSchema(BaseModel): + input_question_in_english: str = Field(description="User's question in plain English to generate SQL query for") + + # Create Vanna instance + vanna_llm_config = builder.get_llm_config(config.llm_name) + vanna_embedder_config = builder.get_embedder_config(config.embedding_name) + + from langchain_core.prompts.chat import ChatPromptTemplate + + llm = await builder.get_llm(config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN) + + system_prompt = """ + You are an intelligent SQL query assistant that analyzes database query results and provides appropriate responses. + + Your responsibilities: + 1. Analyze the SQL query results and determine the best response format + 2. For data extraction queries (multiple rows/complex data): recommend saving to JSON file and provide summary + 3. For simple queries (single values, counts, yes/no): provide direct answers without file storage + 4. Always be helpful and provide context about the results + 5. Generate a descriptive filename for data that should be saved + + Guidelines: + - If results contain multiple rows or complex data (>5 rows or >3 columns): recommend saving to file + - If results are simple (single value, count, or small lookup): provide direct answer + - Always mention the SQL query that was executed + - Be clear about whether data was saved to a file or not + - For files to be saved, suggest a descriptive filename based on the query content (e.g., "sensor_data_unit_5.json", "engine_performance_analysis.json") + """ + + user_prompt = """ + Original Question: {original_question} + + SQL Query Executed: {sql_query} + + Query Results: + - Number of rows: {num_rows} + - Number of columns: {num_columns} + - Columns: {columns} + - Sample data (first few rows): {sample_data} + + Output directory: {output_dir} + + Please provide an appropriate response that either: + 1. Saves the data to JSON file and provides a summary (for complex/large datasets) - suggest a descriptive filename + 2. Directly answers the question with the results (for simple queries) + + Be conversational and helpful. Explain what was found and next steps if applicable. + If saving data, suggest a meaningful filename in the format: "descriptive_name.json" + + Important: Do not use template variables or placeholders in your response. Provide actual values and descriptions. + """ + + prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("user", user_prompt)]) + output_message = prompt | llm + + from .vanna_util import NIMVanna, initVanna, CustomEmbeddingFunction + def get_vanna_instance(vanna_llm_config, vanna_embedder_config, vector_store_path, db_path): + """ + Get a Vanna instance for the given configuration. + Initializes the Vanna instance if it is not already initialized. + + Args: + vanna_llm_config (dict): The configuration for the Vanna LLM. + vanna_embedder_config (dict): The configuration for the Vanna embedder. + vector_store_path (str): The path to the vector store. + db_path (str): The path to the SQL database. + + Returns: + NIMVanna: A Vanna instance. + """ + vn_instance = NIMVanna( + VectorConfig={ + "client": "persistent", + "path": vector_store_path, + "embedding_function": CustomEmbeddingFunction( + api_key=os.getenv("NVIDIA_API_KEY"), + model=vanna_embedder_config.model_name) + }, + LLMConfig={ + "api_key": os.getenv("NVIDIA_API_KEY"), + "model": vanna_llm_config.model_name + } + ) + + # Connect to SQLite database + vn_instance.connect_to_sqlite(db_path) + + # Check if vector store directory is empty and initialize if needed + list_of_folders = [d for d in os.listdir(vector_store_path) + if os.path.isdir(os.path.join(vector_store_path, d))] + if len(list_of_folders) == 0: + logger.info("Initializing Vanna vector store...") + try: + initVanna(vn_instance) + logger.info("Vanna vector store initialization complete.") + except Exception as e: + logger.error(f"Error initializing Vanna vector store: {e}") + raise + else: + logger.info("Vanna vector store already initialized.") + return vn_instance + + vn_instance = get_vanna_instance(vanna_llm_config, vanna_embedder_config, config.vector_store_path, config.db_path) + + async def _response_fn(input_question_in_english: str) -> str: + # Process the input_question_in_english and generate output + if vn_instance is None: + return "Error: Vanna instance not available" + + sql = None + try: + sql = vn_instance.generate_sql(question=input_question_in_english) + logger.info(f"Generated SQL: {sql}") + except Exception as e: + return f"Error generating SQL: {e}" + + if not vn_instance.run_sql_is_set: + return f"Database is not connected via Vanna: {sql}" + + try: + df = vn_instance.run_sql(sql) + if df is None: + return f"Vanna run_sql returned None: {sql}" + if df.empty: + return f"No data found for the generated SQL: {sql}" + + num_rows = df.shape[0] + num_columns = df.shape[1] + columns = df.columns.tolist() + + # Get sample data (first 3 rows for preview) + sample_data = df.head(3).to_dict('records') + + # Use LLM to generate intelligent response + response = await output_message.ainvoke({ + "original_question": input_question_in_english, + "sql_query": sql, + "num_rows": num_rows, + "num_columns": num_columns, + "columns": ", ".join(columns), + "sample_data": json.dumps(sample_data, indent=2), + "output_dir": config.output_folder + }) + + # Check if LLM response suggests saving data (look for keywords or patterns) + llm_response = response.content if hasattr(response, 'content') else str(response) + + # Save data if it's complex (multiple rows or columns) or LLM suggests saving + should_save_data = ( + num_rows > 5 or + num_columns > 3 or + "save" in llm_response.lower() or + "saved" in llm_response.lower() or + "file" in llm_response.lower() + ) + + if should_save_data: + # Extract suggested filename from LLM response or use default + import re + filename_match = re.search(r'"([^"]+\.json)"', llm_response) + if filename_match: + suggested_filename = filename_match.group(1) + else: + # Generate a descriptive filename based on the question + import hashlib + # Clean the question for filename + clean_question = re.sub(r'[^\w\s-]', '', input_question_in_english.lower()) + clean_question = re.sub(r'\s+', '_', clean_question.strip())[:30] + if clean_question: + suggested_filename = f"{clean_question}_results.json" + else: + query_hash = hashlib.md5(input_question_in_english.encode()).hexdigest()[:8] + suggested_filename = f"sql_results_{query_hash}.json" + + sql_output_path = os.path.join(config.output_folder, suggested_filename) + + # Save the data to JSON file + os.makedirs(config.output_folder, exist_ok=True) + json_result = df.to_json(orient="records") + with open(sql_output_path, 'w') as f: + json.dump(json.loads(json_result), f, indent=4) + logger.info(f"Data saved to {sql_output_path}") + + # Clean up the LLM response and add file save confirmation + # Remove any object references that might have slipped through + cleaned_response = re.sub(r',\[object Object\],?', '', llm_response) + cleaned_response = re.sub(r'\[object Object\]', str(num_rows), cleaned_response) + + # If LLM didn't mention the actual saved path, append save confirmation + if sql_output_path not in cleaned_response: + cleaned_response += f"\n\n📁 Data has been saved to: {sql_output_path}" + cleaned_response += f"\n📊 File contains {num_rows} rows with columns: {', '.join(columns)}" + + return cleaned_response + + return llm_response + + except Exception as e: + return f"Error running SQL query '{sql}': {e}" + + description = """ + Use this tool to automatically generate SQL queries for the user's question, retrieve the data from the SQL database and store the data in a JSON file or provide a summary of the data. + Do not provide SQL query as input, only a question in plain english. + + Input: + - input_question_in_english: User's question or a question that you think is relevant to the user's question in plain english + + Output: Status of the generated SQL query's execution along with the output path. The tool will automatically generate descriptive filenames for saved data. + """ + yield FunctionInfo.from_fn(_response_fn, + input_schema=GenerateSqlQueryInputSchema, + description=description) + try: + pass + except GeneratorExit: + logger.info("Generate SQL query and retrieve function exited early!") + finally: + logger.info("Cleaning up generate_sql_query_and_retrieve_tool workflow.") diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_comparison_tool.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_comparison_tool.py new file mode 100644 index 000000000..5b43e856f --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_comparison_tool.py @@ -0,0 +1,363 @@ +import json +import logging +import os +import pandas as pd + +from pydantic import Field, BaseModel + +from aiq.builder.builder import Builder +from aiq.builder.function_info import FunctionInfo +from aiq.cli.register_workflow import register_function +from aiq.data_models.function import FunctionBaseConfig + +logger = logging.getLogger(__name__) + +def verify_json_path(file_path: str) -> str: + """ + Verify that the input is a valid path to a JSON file. + + Args: + file_path (str): Path to verify + + Returns: + str: Verified file path + + Raises: + ValueError: If input is not a string or not a JSON file + FileNotFoundError: If file does not exist + json.JSONDecodeError: If file contains invalid JSON + """ + if not isinstance(file_path, str): + raise ValueError("Input must be a string path to a JSON file") + + if not file_path.lower().endswith('.json'): + raise ValueError("Input must be a path to a JSON file (ending with .json)") + + if not os.path.exists(file_path): + raise FileNotFoundError(f"JSON file not found at path: {file_path}") + + try: + with open(file_path, 'r') as f: + json.load(f) # Verify file contains valid JSON + except json.JSONDecodeError: + raise ValueError(f"File at {file_path} does not contain valid JSON data") + + return file_path + +def knee_RUL(cycle_list, max_cycle, MAXLIFE): + ''' + Piecewise linear function with zero gradient and unit gradient + ^ + | + MAXLIFE |----------- + | \ + | \ + | \ + | \ + | \ + |-----------------------> + ''' + knee_RUL_values = [] + if max_cycle >= MAXLIFE: + knee_point = max_cycle - MAXLIFE + + for i in range(0, len(cycle_list)): + if i < knee_point: + knee_RUL_values.append(MAXLIFE) + else: + tmp = knee_RUL_values[i - 1] - (MAXLIFE / (max_cycle - knee_point)) + knee_RUL_values.append(tmp) + else: + knee_point = MAXLIFE + print("=========== knee_point < MAXLIFE ===========") + for i in range(0, len(cycle_list)): + knee_point -= 1 + knee_RUL_values.append(knee_point) + + return knee_RUL_values + +def apply_piecewise_rul_to_data(df, cycle_col='time_in_cycles', max_life=125): + """ + Apply piecewise RUL transformation to single-engine data. + Uses original RUL values to determine proper failure point. + + Args: + df (pd.DataFrame): Input dataframe (single engine) + cycle_col (str): Column name for cycle/time + max_life (int): Maximum life parameter for knee_RUL function + + Returns: + pd.DataFrame: DataFrame with transformed RUL column + """ + df_copy = df.copy() + + # Check if cycle column exists + if cycle_col not in df_copy.columns: + logger.warning(f"Cycle column '{cycle_col}' not found. Using row index as cycle.") + df_copy[cycle_col] = range(1, len(df_copy) + 1) + + # Get cycle list for single engine + cycle_list = df_copy[cycle_col].tolist() + max_cycle_in_data = max(cycle_list) + + # Use original RUL values to determine true failure point + # Following the original GitHub pattern: max_cycle = max(cycle_list) + final_rul + # Get the final RUL value (RUL at the last cycle in our data) + final_rul = df_copy.loc[df_copy[cycle_col] == max_cycle_in_data, 'actual_RUL'].iloc[0] + # True failure point = last cycle in data + remaining RUL + true_max_cycle = max_cycle_in_data + final_rul + logger.info(f"Using original RUL data: final_rul={final_rul}, true_failure_cycle={true_max_cycle}") + + # Apply knee_RUL function with the true failure point + rul_values = knee_RUL(cycle_list, true_max_cycle, max_life) + + # Replace actual_RUL column with piecewise values + df_copy['actual_RUL'] = rul_values + + return df_copy + +class PlotComparisonToolConfig(FunctionBaseConfig, name="plot_comparison_tool"): + """ + AIQ Toolkit function to plot comparison of two y-axis columns against an x-axis column. + """ + output_folder: str = Field(description="The path to the output folder to save plots.", default="./output_data") + +@register_function(config_type=PlotComparisonToolConfig) +async def plot_comparison_tool( + config: PlotComparisonToolConfig, builder: Builder +): + class PlotComparisonInputSchema(BaseModel): + data_json_path: str = Field(description="The path to the JSON file containing the data") + x_axis_column: str = Field(description="The column name for x-axis data", default="time_in_cycles") + y_axis_column_1: str = Field(description="The first column name for y-axis data", default="actual_RUL") + y_axis_column_2: str = Field(description="The second column name for y-axis data", default="predicted_RUL") + plot_title: str = Field(description="The title for the plot", default="Comparison Plot") + + def load_data_from_json(json_path: str): + """Load data from JSON file into a pandas DataFrame.""" + import pandas as pd + try: + with open(json_path, 'r') as f: + data = json.load(f) + return pd.DataFrame(data) + except FileNotFoundError: + logger.error(f"JSON file not found at {json_path}") + return None + except json.JSONDecodeError: + logger.error(f"Could not decode JSON from {json_path}") + return None + except Exception as e: + logger.error(f"Error loading data from '{json_path}': {e}") + return None + + def create_comparison_plot_html(output_dir: str, data_json_path: str, x_col: str, y_col_1: str, y_col_2: str, title: str): + """ + Generate and save comparison plot as HTML file using Bokeh. + + Args: + output_dir (str): Directory to save the plot file. + data_json_path (str): Path to the input JSON data file. + x_col (str): Column name for x-axis. + y_col_1 (str): Column name for first y-axis line. + y_col_2 (str): Column name for second y-axis line. + title (str): Plot title. + + Returns: + str: Path to the saved HTML file. + """ + import bokeh.plotting as bp + from bokeh.models import ColumnDataSource, HoverTool, Legend + from bokeh.embed import file_html + from bokeh.resources import CDN + + df = load_data_from_json(data_json_path) + if df is None or df.empty: + raise ValueError(f"Could not load data or data is empty from {data_json_path}") + + # Check required columns + required_columns = [x_col, y_col_1, y_col_2] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + raise KeyError(f"Data from {data_json_path} must contain columns: {required_columns}. Missing: {missing_columns}") + + # Apply piecewise RUL transformation if any column is "actual_RUL" + rul_transformation_applied = False + if y_col_1 == "actual_RUL" or y_col_2 == "actual_RUL": + logger.info("Applying piecewise RUL transformation...") + df = apply_piecewise_rul_to_data(df, x_col) + rul_transformation_applied = True + logger.info("Piecewise RUL transformation completed") + + # Sort by x-axis column for proper line plotting + df_sorted = df.sort_values(x_col) + + # Create data sources for each line + line1_source = ColumnDataSource(data=dict( + x=df_sorted[x_col], + y=df_sorted[y_col_1], + label=[y_col_1] * len(df_sorted) + )) + + line2_source = ColumnDataSource(data=dict( + x=df_sorted[x_col], + y=df_sorted[y_col_2], + label=[y_col_2] * len(df_sorted) + )) + + # Create hover tools + hover_line1 = HoverTool(tooltips=[ + (f"{x_col}", "@x"), + (f"{y_col_1}", "@y{0.0}"), + ("Type", "@label") + ], renderers=[]) + + hover_line2 = HoverTool(tooltips=[ + (f"{x_col}", "@x"), + (f"{y_col_2}", "@y{0.0}"), + ("Type", "@label") + ], renderers=[]) + + fig = bp.figure( + title=title, + x_axis_label=x_col, + y_axis_label='Value', + width=800, # Increased width to provide more space + height=450, + tools=['pan', 'box_zoom', 'wheel_zoom', 'reset', 'save'], + toolbar_location="above" + ) + + # Add the lines + line2_render = fig.line( + 'x', 'y', source=line2_source, + line_color='#2E8B57', line_width=3, alpha=0.9, + legend_label=y_col_2 + ) + + line1_render = fig.line( + 'x', 'y', source=line1_source, + line_color='#20B2AA', line_width=3, alpha=0.9, + line_dash='dashed', legend_label=y_col_1 + (" (Piecewise)" if y_col_1 == "actual_RUL" and rul_transformation_applied else "") + ) + + # Add hover tools to specific renderers + hover_line2.renderers = [line2_render] + hover_line1.renderers = [line1_render] + fig.add_tools(hover_line2, hover_line1) + + # Style the plot + fig.title.text_font_size = "16pt" + fig.title.align = "center" + fig.xaxis.axis_label_text_font_size = "14pt" + fig.yaxis.axis_label_text_font_size = "14pt" + + # Position legend in bottom right and style it + fig.legend.location = "bottom_right" + fig.legend.label_text_font_size = "12pt" + fig.legend.glyph_width = 30 + fig.legend.background_fill_alpha = 0.8 + fig.legend.background_fill_color = "white" + fig.legend.border_line_color = "gray" + fig.legend.border_line_width = 1 + fig.legend.padding = 10 + + fig.grid.grid_line_alpha = 0.3 + + # Set axis ranges for better visualization + y_min = min(df_sorted[y_col_1].min(), df_sorted[y_col_2].min()) + y_max = max(df_sorted[y_col_1].max(), df_sorted[y_col_2].max()) + y_range = y_max - y_min + fig.y_range.start = max(0, y_min - y_range * 0.05) + fig.y_range.end = y_max + y_range * 0.05 + + # Generate standalone HTML file + html_content = file_html(fig, CDN, title) + + # Save the HTML file + os.makedirs(output_dir, exist_ok=True) + output_filepath = os.path.join(output_dir, f"comparison_plot_{y_col_1}_vs_{y_col_2}.html") + with open(output_filepath, 'w', encoding='utf-8') as f: + f.write(html_content) + logger.info(f"Comparison plot saved to {output_filepath}") + + return output_filepath + + async def _response_fn(data_json_path: str, x_axis_column: str, y_axis_column_1: str, y_axis_column_2: str, plot_title: str) -> str: + """ + Process the input message and generate comparison plot. + """ + try: + # Load data to validate columns exist + df = load_data_from_json(data_json_path) + if df is None or df.empty: + return "Could not load data or data is empty from the provided JSON file" + + # Check required columns + required_columns = [x_axis_column, y_axis_column_1, y_axis_column_2] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + return f"Data from {data_json_path} must contain columns: {required_columns}. Missing: {missing_columns}" + + output_filepath = create_comparison_plot_html( + output_dir=config.output_folder, + data_json_path=data_json_path, + x_col=x_axis_column, + y_col_1=y_axis_column_1, + y_col_2=y_axis_column_2, + title=plot_title + ) + + # Convert absolute path to file:// URL for proper browser handling + file_url = f"file://{output_filepath}" + + # Add info about RUL transformation if applied + rul_info = "" + if y_axis_column_1 == "actual_RUL" or y_axis_column_2 == "actual_RUL": + rul_info = f"\n- Piecewise RUL transformation applied (max_life=125)" + + # Return a clear completion message that the LLM will understand + return f""" + TASK COMPLETED SUCCESSFULLY\n\nComparison plot has been generated and saved. + \n\nChart Details:\n- + Type: Comparison plot with two lines\n- X-axis: {x_axis_column}\n- Y-axis Line 1: {y_axis_column_1} (dashed teal)\n- Y-axis Line 2: {y_axis_column_2} (solid green)\n- Title: {plot_title}{rul_info}\n- Output File: {output_filepath}\n- File URL: {file_url}\n\n✅ CHART GENERATION COMPLETE - NO FURTHER ACTION NEEDED + """ + + except FileNotFoundError as e: + error_msg = f"Required data file ('{data_json_path}') not found for comparison plot: {e}" + logger.error(error_msg) + return error_msg + except KeyError as ke: + error_msg = f"Missing required columns in '{data_json_path}' for comparison plot: {ke}" + logger.error(error_msg) + return error_msg + except ValueError as ve: + error_msg = f"Data validation error for comparison plot: {ve}" + logger.error(error_msg) + return error_msg + except Exception as e: + error_msg = f"Error generating comparison plot: {e}" + logger.error(error_msg) + return error_msg + + prompt = """ + Generate interactive comparison plot between two columns from JSON data. + + Input: + - data_json_path: Path to the JSON file containing the data + - x_axis_column: Column name for x-axis data + - y_axis_column_1: Column name for first y-axis data + - y_axis_column_2: Column name for second y-axis data + - plot_title: Title for the plot + + Output: + - HTML file containing the comparison plot + """ + yield FunctionInfo.from_fn(_response_fn, + input_schema=PlotComparisonInputSchema, + description=prompt) + try: + pass + except GeneratorExit: + logger.info("Plot comparison function exited early!") + finally: + logger.info("Cleaning up plot_comparison_tool workflow.") diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_distribution_tool.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_distribution_tool.py new file mode 100644 index 000000000..3c0a5f1a1 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_distribution_tool.py @@ -0,0 +1,211 @@ +import json +import logging +import os + +from pydantic import Field, BaseModel + +from aiq.builder.builder import Builder +from aiq.builder.function_info import FunctionInfo +from aiq.cli.register_workflow import register_function +from aiq.data_models.function import FunctionBaseConfig + +logger = logging.getLogger(__name__) + +def verify_json_path(file_path: str) -> str: + """ + Verify that the input is a valid path to a JSON file. + + Args: + file_path (str): Path to verify + + Returns: + str: Verified file path + + Raises: + ValueError: If input is not a string or not a JSON file + FileNotFoundError: If file does not exist + json.JSONDecodeError: If file contains invalid JSON + """ + if not isinstance(file_path, str): + raise ValueError("Input must be a string path to a JSON file") + + if not file_path.lower().endswith('.json'): + raise ValueError("Input must be a path to a JSON file (ending with .json)") + + if not os.path.exists(file_path): + raise FileNotFoundError(f"JSON file not found at path: {file_path}") + + try: + with open(file_path, 'r') as f: + json.load(f) # Verify file contains valid JSON + except json.JSONDecodeError: + raise ValueError(f"File at {file_path} does not contain valid JSON data") + + return file_path + +class PlotDistributionToolConfig(FunctionBaseConfig, name="plot_distribution_tool"): + """ + AIQ Toolkit function to plot distribution histogram of a specified column. + """ + output_folder: str = Field(description="The path to the output folder to save plots.", default="./output_data") + +@register_function(config_type=PlotDistributionToolConfig) +async def plot_distribution_tool( + config: PlotDistributionToolConfig, builder: Builder +): + class PlotDistributionInputSchema(BaseModel): + data_json_path: str = Field(description="The path to the JSON file containing the data") + column_name: str = Field(description="The column name to create distribution plot for", default="RUL") + plot_title: str = Field(description="The title for the plot", default="Distribution Plot") + + def load_data_from_json(json_path: str): + """Load data from JSON file into a pandas DataFrame.""" + import pandas as pd + try: + with open(json_path, 'r') as f: + data = json.load(f) + return pd.DataFrame(data) + except FileNotFoundError: + logger.error(f"JSON file not found at {json_path}") + return None + except json.JSONDecodeError: + logger.error(f"Could not decode JSON from {json_path}") + return None + except Exception as e: + logger.error(f"Error loading data from '{json_path}': {e}") + return None + + def create_distribution_plot_html(output_dir: str, data_json_path: str, column_name: str, title: str): + """ + Generate and save distribution histogram as HTML file using Bokeh. + + Args: + output_dir (str): Directory to save the plot file. + data_json_path (str): Path to the input JSON data file. + column_name (str): Column name to create distribution for. + title (str): Plot title. + + Returns: + str: Path to the saved HTML file. + """ + import bokeh.plotting as bp + from bokeh.models import ColumnDataSource, HoverTool + from bokeh.embed import file_html + from bokeh.resources import CDN + import pandas as pd + import numpy as np + + df = load_data_from_json(data_json_path) + if df is None or df.empty: + raise ValueError(f"Could not load data or data is empty from {data_json_path}") + + if column_name not in df.columns: + raise KeyError(f"Data from {data_json_path} must contain '{column_name}' column. Found: {df.columns.tolist()}") + + # Create histogram data + hist, edges = np.histogram(df[column_name], bins=30) + hist_df = pd.DataFrame({ + f'{column_name}_left': edges[:-1], + f'{column_name}_right': edges[1:], + 'Frequency': hist + }) + + source = ColumnDataSource(hist_df) + hover = HoverTool(tooltips=[ + (f"{column_name} Range", f"@{column_name}_left{{0.0}} - @{column_name}_right{{0.0}}"), + ("Frequency", "@Frequency") + ]) + + fig = bp.figure( + title=title, + x_axis_label=column_name, + y_axis_label='Frequency', + width=650, + height=450, + tools=[hover, 'pan', 'box_zoom', 'wheel_zoom', 'reset', 'save'], + toolbar_location="above" + ) + + # Add the histogram bars + fig.quad( + bottom=0, top='Frequency', left=f'{column_name}_left', right=f'{column_name}_right', + fill_color='#e17160', line_color='white', alpha=0.8, source=source + ) + + # Style the plot + fig.title.text_font_size = "14pt" + fig.xaxis.axis_label_text_font_size = "12pt" + fig.yaxis.axis_label_text_font_size = "12pt" + fig.grid.grid_line_alpha = 0.3 + + # Generate standalone HTML file + html_content = file_html(fig, CDN, title) + + # Save the HTML file + os.makedirs(output_dir, exist_ok=True) + output_filepath = os.path.join(output_dir, f"distribution_plot_{column_name}.html") + with open(output_filepath, 'w', encoding='utf-8') as f: + f.write(html_content) + logger.info(f"Distribution plot saved to {output_filepath}") + + return output_filepath + + async def _response_fn(data_json_path: str, column_name: str, plot_title: str) -> str: + """ + Process the input message and generate distribution histogram file. + """ + data_json_path = verify_json_path(data_json_path) + try: + # Load data to validate column exists + df = load_data_from_json(data_json_path) + if df is None or df.empty: + return "Could not load data or data is empty from the provided JSON file" + + if column_name not in df.columns: + return f"Column '{column_name}' not found in data. Available columns: {df.columns.tolist()}" + + output_filepath = create_distribution_plot_html( + output_dir=config.output_folder, + data_json_path=data_json_path, + column_name=column_name, + title=plot_title + ) + + # Convert absolute path to file:// URL for proper browser handling + file_url = f"file://{output_filepath}" + + # Return a clear completion message that the LLM will understand + return f"TASK COMPLETED SUCCESSFULLY\n\nDistribution histogram has been generated and saved.\n\nChart Details:\n- Type: Distribution histogram (30 bins)\n- Column: {column_name}\n- Title: {plot_title}\n- Output File: {output_filepath}\n- File URL: {file_url}\n\n✅ CHART GENERATION COMPLETE - NO FURTHER ACTION NEEDED" + + except FileNotFoundError as e: + error_msg = f"Required data file ('{data_json_path}') not found for distribution plot: {e}" + logger.error(error_msg) + return error_msg + except KeyError as ke: + error_msg = f"Missing expected column '{column_name}' in '{data_json_path}' for distribution plot: {ke}" + logger.error(error_msg) + return error_msg + except Exception as e: + error_msg = f"Error generating distribution histogram: {e}" + logger.error(error_msg) + return error_msg + + prompt = """ + Generate interactive distribution histogram from JSON data. + Input: + - data_json_path: Path to the JSON file containing the data + - column_name: Column name for the distribution histogram + - plot_title: Title for the plot + + Output: + - HTML file containing the distribution histogram + """ + yield FunctionInfo.from_fn(_response_fn, + input_schema=PlotDistributionInputSchema, + description=prompt) + try: + pass + except GeneratorExit: + logger.info("Plot distribution function exited early!") + finally: + logger.info("Cleaning up plot_distribution_tool workflow.") diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_line_chart_tool.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_line_chart_tool.py new file mode 100644 index 000000000..4c1047c02 --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/plot_line_chart_tool.py @@ -0,0 +1,242 @@ +import json +import logging +import os + +from pydantic import Field, BaseModel + +from aiq.builder.builder import Builder +from aiq.builder.function_info import FunctionInfo +from aiq.cli.register_workflow import register_function +from aiq.data_models.function import FunctionBaseConfig + +logger = logging.getLogger(__name__) + +def verify_json_path(file_path: str) -> str: + """ + Verify that the input is a valid path to a JSON file. + + Args: + file_path (str): Path to verify + + Returns: + str: Verified file path + + Raises: + ValueError: If input is not a string or not a JSON file + FileNotFoundError: If file does not exist + json.JSONDecodeError: If file contains invalid JSON + """ + if not isinstance(file_path, str): + raise ValueError("Input must be a string path to a JSON file") + + if not file_path.lower().endswith('.json'): + raise ValueError("Input must be a path to a JSON file (ending with .json)") + + if not os.path.exists(file_path): + raise FileNotFoundError(f"JSON file not found at path: {file_path}") + + try: + with open(file_path, 'r') as f: + json.load(f) # Verify file contains valid JSON + except json.JSONDecodeError: + raise ValueError(f"File at {file_path} does not contain valid JSON data") + + return file_path + +class PlotLineChartToolConfig(FunctionBaseConfig, name="plot_line_chart_tool"): + """ + AIQ Toolkit function to plot a line chart with specified x and y axis columns. + """ + output_folder: str = Field(description="The path to the output folder to save plots.", default="./output_data") + +@register_function(config_type=PlotLineChartToolConfig) +async def plot_line_chart_tool( + config: PlotLineChartToolConfig, builder: Builder +): + class PlotLineChartInputSchema(BaseModel): + data_json_path: str = Field(description="The path to the JSON file containing the data") + x_axis_column: str = Field(description="The column name for x-axis data", default="time_in_cycles") + y_axis_column: str = Field(description="The column name for y-axis data", default="RUL") + plot_title: str = Field(description="The title for the plot", default="Line Chart") + + def load_data_from_json(json_path: str): + """Load data from JSON file into a pandas DataFrame.""" + import pandas as pd + try: + with open(json_path, 'r') as f: + data = json.load(f) + return pd.DataFrame(data) + except FileNotFoundError: + logger.error(f"JSON file not found at {json_path}") + return None + except json.JSONDecodeError: + logger.error(f"Could not decode JSON from {json_path}") + return None + except Exception as e: + logger.error(f"Error loading data from '{json_path}': {e}") + return None + + def create_line_chart_plot_html(output_dir: str, data_json_path: str, x_col: str, y_col: str, title: str): + """ + Generate and save line chart as HTML file using Bokeh. + + Args: + output_dir (str): Directory to save the plot file. + data_json_path (str): Path to the input JSON data file. + x_col (str): Column name for x-axis. + y_col (str): Column name for y-axis. + title (str): Plot title. + + Returns: + str: Path to the saved HTML file. + """ + import bokeh.plotting as bp + from bokeh.models import ColumnDataSource, HoverTool + from bokeh.embed import file_html + from bokeh.resources import CDN + import pandas as pd + + df = load_data_from_json(data_json_path) + if df is None or df.empty: + raise ValueError(f"Could not load data or data is empty from {data_json_path}") + + # Check required columns + required_columns = [x_col, y_col] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + raise KeyError(f"Data from {data_json_path} must contain columns: {required_columns}. Missing: {missing_columns}") + + # Sort by x-axis column for proper line plotting + df_sorted = df.sort_values(x_col) + + # Create data source + source = ColumnDataSource(data=dict( + x=df_sorted[x_col], + y=df_sorted[y_col] + )) + + # Create hover tool + hover = HoverTool(tooltips=[ + (f"{x_col}", "@x"), + (f"{y_col}", "@y{0.00}") + ]) + + fig = bp.figure( + title=title, + x_axis_label=x_col, + y_axis_label=y_col, + width=650, + height=450, + tools=[hover, 'pan', 'box_zoom', 'wheel_zoom', 'reset', 'save'], + toolbar_location="above" + ) + + # Add the line + fig.line( + 'x', 'y', source=source, + line_color='#1f77b4', line_width=3, alpha=0.9 + ) + + # Add circle markers for data points + fig.circle( + 'x', 'y', source=source, + size=6, color='#1f77b4', alpha=0.7 + ) + + # Style the plot + fig.title.text_font_size = "16pt" + fig.title.align = "center" + fig.xaxis.axis_label_text_font_size = "14pt" + fig.yaxis.axis_label_text_font_size = "14pt" + fig.grid.grid_line_alpha = 0.3 + + # Set axis ranges for better visualization + y_min = df_sorted[y_col].min() + y_max = df_sorted[y_col].max() + y_range = y_max - y_min + if y_range > 0: + fig.y_range.start = y_min - y_range * 0.05 + fig.y_range.end = y_max + y_range * 0.05 + + # Generate standalone HTML file + html_content = file_html(fig, CDN, title) + + # Save the HTML file + os.makedirs(output_dir, exist_ok=True) + output_filepath = os.path.join(output_dir, f"line_chart_{x_col}_vs_{y_col}.html") + with open(output_filepath, 'w', encoding='utf-8') as f: + f.write(html_content) + logger.info(f"Line chart saved to {output_filepath}") + + return output_filepath + + async def _response_fn(data_json_path: str, x_axis_column: str, y_axis_column: str, plot_title: str) -> str: + """ + Process the input message and generate line chart. + """ + data_json_path = verify_json_path(data_json_path) + + try: + # Load data to validate columns exist + df = load_data_from_json(data_json_path) + if df is None or df.empty: + return "Could not load data or data is empty from the provided JSON file" + + # Check required columns + required_columns = [x_axis_column, y_axis_column] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + return f"Data from {data_json_path} must contain columns: {required_columns}. Missing: {missing_columns}" + + output_filepath = create_line_chart_plot_html( + output_dir=config.output_folder, + data_json_path=data_json_path, + x_col=x_axis_column, + y_col=y_axis_column, + title=plot_title + ) + + # Convert absolute path to file:// URL for proper browser handling + file_url = f"file://{output_filepath}" + + # Return a clear completion message that the LLM will understand + return f"TASK COMPLETED SUCCESSFULLY\n\nLine chart has been generated and saved.\n\nChart Details:\n- Type: Line chart with markers\n- X-axis: {x_axis_column}\n- Y-axis: {y_axis_column}\n- Title: {plot_title}\n- Output File: {output_filepath}\n- File URL: {file_url}\n\n✅ CHART GENERATION COMPLETE - NO FURTHER ACTION NEEDED" + + except FileNotFoundError as e: + error_msg = f"Required data file ('{data_json_path}') not found for line chart: {e}" + logger.error(error_msg) + return error_msg + except KeyError as ke: + error_msg = f"Missing required columns in '{data_json_path}' for line chart: {ke}" + logger.error(error_msg) + return error_msg + except ValueError as ve: + error_msg = f"Data validation error for line chart: {ve}" + logger.error(error_msg) + return error_msg + except Exception as e: + error_msg = f"Error generating line chart: {e}" + logger.error(error_msg) + return error_msg + + prompt = """ + Generate interactive line chart from JSON data. + + Input: + - data_json_path: Path to the JSON file containing the data + - x_axis_column: Column name for x-axis data + - y_axis_column: Column name for y-axis data + - plot_title: Title for the plot + + Output: + - HTML file containing the line chart + """ + yield FunctionInfo.from_fn(_response_fn, + input_schema=PlotLineChartInputSchema, + description=prompt) + try: + pass + except GeneratorExit: + logger.info("Plot line chart function exited early!") + finally: + logger.info("Cleaning up plot_line_chart_tool workflow.") diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/predict_rul_tool.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/predict_rul_tool.py new file mode 100644 index 000000000..e236545ee --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/predict_rul_tool.py @@ -0,0 +1,251 @@ +import json +import logging +import os +import warnings +import pickle +import joblib +import numpy as np + +from pydantic import Field, BaseModel + +from aiq.builder.builder import Builder +from aiq.builder.function_info import FunctionInfo +from aiq.cli.register_workflow import register_function +from aiq.data_models.function import FunctionBaseConfig + +logger = logging.getLogger(__name__) + +def verify_json_path(file_path: str) -> str: + """ + Verify that the input is a valid path to a JSON file. + + Args: + file_path (str): Path to verify + + Returns: + str: Verified file path + + Raises: + ValueError: If input is not a string or not a JSON file + FileNotFoundError: If file does not exist + json.JSONDecodeError: If file contains invalid JSON + """ + if not isinstance(file_path, str): + return "Input must be a string path to a JSON file" + + if not file_path.lower().endswith('.json'): + return "Input must be a path to a JSON file (ending with .json)" + + if not os.path.exists(file_path): + return f"JSON file not found at path: {file_path}" + + try: + with open(file_path, 'r') as f: + json.load(f) # Verify file contains valid JSON + except json.JSONDecodeError: + return f"File at {file_path} does not contain valid JSON data" + + return file_path + +class PredictRulToolConfig(FunctionBaseConfig, name="predict_rul_tool"): + """ + AIQ Toolkit function to predict RUL (Remaining Useful Life) using trained models and provided data. + """ + # Runtime configuration parameters + scaler_path: str = Field(description="Path to the trained StandardScaler model.", default="./models/scaler_model.pkl") + model_path: str = Field(description="Path to the trained XGBoost model.", default="./models/xgb_model_fd001.pkl") + output_folder: str = Field(description="The path to the output folder to save prediction results.", default="./output_data") + +@register_function(config_type=PredictRulToolConfig) +async def predict_rul_tool( + config: PredictRulToolConfig, builder: Builder +): + class PredictRulInputSchema(BaseModel): + json_file_path: str = Field(description="Path to a JSON file containing sensor measurements data for RUL prediction") + + def load_data_from_json(json_path: str): + """Load data from JSON file into a pandas DataFrame.""" + import pandas as pd + try: + with open(json_path, 'r') as f: + data = json.load(f) + return pd.DataFrame(data) + except FileNotFoundError: + logger.warn(f"JSON file not found at {json_path}") + return None + except json.JSONDecodeError: + logger.warn(f"Could not decode JSON from {json_path}") + return None + except Exception as e: + logger.warn(f"Error loading data from '{json_path}': {e}") + return None + + def predict_rul_from_data(data_json_path: str, scaler_path: str, model_path: str, output_dir: str): + """ + Load data and trained models to make RUL predictions. + + Args: + data_json_path (str): Path to the input JSON data file. + scaler_path (str): Path to the trained StandardScaler model. + model_path (str): Path to the trained XGBoost model. + output_dir (str): Directory to save prediction results (unused - kept for compatibility). + + Returns: + tuple: (predictions array, original file path) + """ + import pandas as pd + + # Suppress warnings + warnings.filterwarnings("ignore", message="X does not have valid feature names") + + # Load the data + df = load_data_from_json(data_json_path) + if df is None or df.empty: + raise ValueError(f"Could not load data or data is empty from {data_json_path}") + + # Prepare features for prediction (exclude non-feature columns if present) + required_columns = ['sensor_measurement_2', + 'sensor_measurement_3', + 'sensor_measurement_4', + 'sensor_measurement_7', + 'sensor_measurement_8', + 'sensor_measurement_11', + 'sensor_measurement_12', + 'sensor_measurement_13', + 'sensor_measurement_15', + 'sensor_measurement_17', + 'sensor_measurement_20', + 'sensor_measurement_21'] + feature_columns = [col for col in df.columns if col in required_columns] + if not feature_columns: + raise ValueError(f"No valid feature columns found in the data. Available columns: {df.columns.tolist()}") + + X_test = df[feature_columns].values + logger.info(f"Using {len(feature_columns)} features for prediction: {feature_columns}") + + # Load the StandardScaler + try: + scaler_loaded = joblib.load(scaler_path) + logger.info(f"Successfully loaded scaler from {scaler_path}") + except Exception as e: + raise FileNotFoundError(f"Could not load scaler from {scaler_path}: {e}") + + # Transform the test data using the loaded scaler + X_test_scaled = scaler_loaded.transform(X_test) + + # Load the XGBoost model + try: + with open(model_path, 'rb') as f: + xgb_model = pickle.load(f) + logger.info(f"Successfully loaded XGBoost model from {model_path}") + except Exception as e: + raise FileNotFoundError(f"Could not load XGBoost model from {model_path}: {e}") + + # Make predictions + y_pred = xgb_model.predict(X_test_scaled) + logger.info(f"Generated {len(y_pred)} RUL predictions") + + # Create results DataFrame + results_df = df.copy() + results_df = results_df.rename(columns={'RUL': 'actual_RUL'}) + results_df['predicted_RUL'] = y_pred + + # Save results back to the original JSON file + results_json = results_df.to_dict('records') + with open(data_json_path, 'w') as f: + json.dump(results_json, f, indent=2) + + logger.info(f"Prediction results saved back to original file: {data_json_path}") + + return y_pred, data_json_path + + async def _response_fn(json_file_path: str) -> str: + """ + Process the input message and generate RUL predictions using trained models. + """ + logger.info(f"Input message: {json_file_path}") + data_json_path = verify_json_path(json_file_path) + try: + predictions, output_filepath = predict_rul_from_data( + data_json_path=data_json_path, + scaler_path=config.scaler_path, + model_path=config.model_path, + output_dir=config.output_folder + ) + + # Generate summary statistics + avg_rul = np.mean(predictions) + min_rul = np.min(predictions) + max_rul = np.max(predictions) + std_rul = np.std(predictions) + + # Create response with prediction summary + response = f"""RUL predictions generated successfully! 📊 + +**Prediction Summary:** +- **Total predictions:** {len(predictions)} +- **Average RUL:** {avg_rul:.2f} cycles +- **Minimum RUL:** {min_rul:.2f} cycles +- **Maximum RUL:** {max_rul:.2f} cycles +- **Standard Deviation:** {std_rul:.2f} cycles + +**Results saved to:** {output_filepath} + +The predictions have been added to the original dataset with column name 'predicted_RUL'. The original JSON file has been updated with the RUL predictions. +All columns from the original dataset have been preserved, and the predicted RUL column has been renamed to 'predicted_RUL' and the actual RUL column has been renamed to 'actual_RUL'.""" + + return response + + except FileNotFoundError as e: + error_msg = f"Required file not found for RUL prediction: {e}. Please ensure all model files and data are available." + logger.warn(error_msg) + return error_msg + except ValueError as ve: + error_msg = f"Data validation error for RUL prediction: {ve}. Check the input data format." + logger.warn(error_msg) + return error_msg + except Exception as e: + error_msg = f"Error during RUL prediction: {e}" + logger.warn(error_msg) + return error_msg + + prompt = """ + Predict RUL (Remaining Useful Life) for turbofan engines using trained machine learning models. + + Input: + - Path to a JSON file containing sensor measurements. + + Required columns: + * sensor_measurement_2 + * sensor_measurement_3 + * sensor_measurement_4 + * sensor_measurement_7 + * sensor_measurement_8 + * sensor_measurement_11 + * sensor_measurement_12 + * sensor_measurement_13 + * sensor_measurement_15 + * sensor_measurement_17 + * sensor_measurement_20 + * sensor_measurement_21 + + Process: + 1. Load and preprocess data using StandardScaler + 2. Generate predictions using XGBoost model + 3. Calculate summary statistics (mean, min, max, std dev) + 4. Save predictions to JSON file + + Output: + - RUL predictions for each engine unit + - Summary statistics of predictions + - Updated JSON file with predictions added as 'predicted_RUL' column + """ + yield FunctionInfo.from_fn(_response_fn, + input_schema=PredictRulInputSchema, + description=prompt) + try: + pass + except GeneratorExit: + logger.info("Predict RUL function exited early!") + finally: + logger.info("Cleaning up predict_rul_tool workflow.") diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/register.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/register.py new file mode 100644 index 000000000..53df6e30f --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/register.py @@ -0,0 +1,16 @@ +# pylint: disable=unused-import +# flake8: noqa + +# Import any tools which need to be automatically registered here +from . import generate_sql_query_and_retrieve_tool +from . import predict_rul_tool +from . import plot_distribution_tool +from . import plot_comparison_tool +from . import plot_line_chart_tool +# from . import plot_sensor_over_RUL_tool +# from . import load_models_and_predict_RUL_tool +# from . import plot_anomaly_tool +# from . import distribution_histogram_rul_tool +# from . import plot_prediction_errors_tool +# from . import plot_rul_error_by_health_state_tool +# from . import generate_chart_tool diff --git a/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/vanna_util.py b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/vanna_util.py new file mode 100644 index 000000000..16c6eff1d --- /dev/null +++ b/industries/manufacturing/predictive_maintenance_agent/src/predictive_maintenance_agent/vanna_util.py @@ -0,0 +1,259 @@ +from vanna.chromadb import ChromaDB_VectorStore +from vanna.base import VannaBase +from langchain_nvidia import ChatNVIDIA +from tqdm import tqdm + +class NIMCustomLLM(VannaBase): + def __init__(self, config=None): + VannaBase.__init__(self, config=config) + + if not config: + raise ValueError("config must be passed") + + # default parameters - can be overrided using config + self.temperature = 0.7 + + if "temperature" in config: + self.temperature = config["temperature"] + + # If only config is passed + if "api_key" not in config: + raise ValueError("config must contain a NIM api_key") + + if "model" not in config: + raise ValueError("config must contain a NIM model") + + api_key = config["api_key"] + model = config["model"] + + # Initialize ChatNVIDIA client + self.client = ChatNVIDIA( + api_key=api_key, + model=model, + temperature=self.temperature, + ) + self.model = model + + def system_message(self, message: str) -> any: + return {"role": "system", "content": message+"\n DO NOT PRODUCE MARKDOWN, ONLY RESPOND IN PLAIN TEXT"} + + def user_message(self, message: str) -> any: + return {"role": "user", "content": message} + + def assistant_message(self, message: str) -> any: + return {"role": "assistant", "content": message} + + def submit_prompt(self, prompt, **kwargs) -> str: + if prompt is None: + raise Exception("Prompt is None") + + if len(prompt) == 0: + raise Exception("Prompt is empty") + + # Count the number of tokens in the message log + # Use 4 as an approximation for the number of characters per token + num_tokens = 0 + for message in prompt: + num_tokens += len(message["content"]) / 4 + print(f"Using model {self.model} for {num_tokens} tokens (approx)") + + response = self.client.invoke(prompt) + return response.content + +class NIMVanna(ChromaDB_VectorStore, NIMCustomLLM): + def __init__(self, VectorConfig = None, LLMConfig = None): + ChromaDB_VectorStore.__init__(self, config=VectorConfig) + NIMCustomLLM.__init__(self, config=LLMConfig) + +class CustomEmbeddingFunction: + """ + A class that can be used as a replacement for chroma's DefaultEmbeddingFunction. + It takes in input (text or list of texts) and returns embeddings using NVIDIA's API. + """ + + def __init__(self, api_key, model="nvidia/nv-embedqa-e5-v5"): + """ + Initialize the embedding function with the API key and model name. + + Parameters: + - api_key (str): The API key for authentication. + - model (str): The model name to use for embeddings (default is "nvidia/nv-embedqa-e5-v5"). + """ + from langchain_nvidia import NVIDIAEmbeddings + + self.embeddings = NVIDIAEmbeddings( + api_key=api_key, + model_name=model, + input_type="query", + truncate="NONE" + ) + + def __call__(self, input): + """ + Call method to make the object callable, as required by chroma's EmbeddingFunction interface. + + Parameters: + - input (str or list): The input data for which embeddings need to be generated. + + Returns: + - embedding (list): The embedding vector(s) for the input data. + """ + # Ensure input is a list, as required by the API + input_data = [input] if isinstance(input, str) else input + + # Generate embeddings + embeddings = [] + for text in input_data: + embedding = self.embeddings.embed_query(text) + embeddings.append(embedding) + + return embeddings[0] if len(embeddings) == 1 and isinstance(input, str) else embeddings + +def initVanna(vn): + # Get and train DDL from sqlite_master + df_ddl = vn.run_sql("SELECT type, sql FROM sqlite_master WHERE sql is not null") + for ddl in df_ddl['sql'].to_list(): + vn.train(ddl=ddl) + + # Define FD datasets and DDL for RUL tables + fd_datasets = ["FD001", "FD002", "FD003", "FD004"] + for fd in fd_datasets: + vn.train(ddl=f""" + CREATE TABLE IF NOT EXISTS RUL_{fd} ( + "unit_number" INTEGER, + "RUL" INTEGER + ) + """) + + # Common sensor columns for train and test tables + sensor_columns = """ + "unit_number" INTEGER, + "time_in_cycles" INTEGER, + "operational_setting_1" REAL, + "operational_setting_2" REAL, + "operational_setting_3" REAL, + "sensor_measurement_1" REAL, + "sensor_measurement_2" REAL, + "sensor_measurement_3" REAL, + "sensor_measurement_4" REAL, + "sensor_measurement_5" REAL, + "sensor_measurement_6" REAL, + "sensor_measurement_7" REAL, + "sensor_measurement_8" REAL, + "sensor_measurement_9" REAL, + "sensor_measurement_10" REAL, + "sensor_measurement_11" REAL, + "sensor_measurement_12" REAL, + "sensor_measurement_13" REAL, + "sensor_measurement_14" REAL, + "sensor_measurement_15" REAL, + "sensor_measurement_16" REAL, + "sensor_measurement_17" INTEGER, + "sensor_measurement_18" INTEGER, + "sensor_measurement_19" REAL, + "sensor_measurement_20" REAL, + "sensor_measurement_21" REAL + """ + + # Create train and test tables for each FD dataset + for fd in fd_datasets: + vn.train(ddl=f"CREATE TABLE IF NOT EXISTS train_{fd} ({sensor_columns})") + vn.train(ddl=f"CREATE TABLE IF NOT EXISTS test_{fd} ({sensor_columns})") + + dataset_documentation = """ + Data sets (FD001, FD002, FD003, FD004) consists of multiple multivariate time series. Each data set is further divided into training and test subsets. + Each time series is from a different engine � i.e., the data can be considered to be from a fleet of engines of the same type. + Each engine starts with different degrees of initial wear and manufacturing variation which is unknown to the user. T + his wear and variation is considered normal, i.e., it is not considered a fault condition. There are three operational settings that have a substantial effect on engine performance. + These settings are also included in the data. The data is contaminated with sensor noise. + + The engine is operating normally at the start of each time series, and develops a fault at some point during the series. + In the training set, the fault grows in magnitude until system failure. In the test set, the time series ends some time prior to system failure. + The objective is to predict the number of remaining operational cycles before failure in the test set, i.e., the number of operational cycles after the last cycle that the engine will continue to operate. + Also provided a vector of true Remaining Useful Life (RUL) values only for the test data. + + The data are provided as a zip-compressed text file with 26 columns of numbers, separated by spaces. + Each row is a snapshot of data taken during a single operational cycle, each column is a different variable. The columns correspond to: + 1) unit number + 2) time, in cycles + 3) operational setting 1 + 4) operational setting 2 + 5) operational setting 3 + 6) sensor measurement 1 + 7) sensor measurement 2 + ... + 26) sensor measurement 26 + """ + + # Additional training examples + vn.train(documentation=dataset_documentation) + + # Additional training examples + # vn.train(documentation="The engine_sensor_data table tracks operational settings and sensor measurements for multiple engines in a fleet. It includes Remaining Useful Life (RUL) predictions for each engine.") + + queries = [ + "SELECT * FROM train_FD001 AS t JOIN RUL_FD001 AS r ON t.unit_number = r.unit_number ORDER BY t.time_in_cycles DESC LIMIT 10", + "SELECT unit_number, AVG(sensor_measurement_1), AVG(sensor_measurement_2), AVG(sensor_measurement_3) FROM train_FD001 GROUP BY unit_number", + "SELECT unit_number, SUM(sensor_measurement_1), SUM(sensor_measurement_2), SUM(sensor_measurement_3) FROM train_FD001 GROUP BY unit_number", + "SELECT * FROM train_FD002 WHERE time_in_cycles BETWEEN 50 AND 100", + "SELECT * FROM train_FD003 WHERE unit_number = 1 ORDER BY time_in_cycles ASC", + """ + SELECT unit_number, + MAX(sensor_measurement_1) AS max_sensor1, + MIN(sensor_measurement_1) AS min_sensor1, + AVG(sensor_measurement_1) AS avg_sensor1 + FROM train_FD004 + GROUP BY unit_number + """, + """ + SELECT unit_number, + AVG(sensor_measurement_5) AS avg_sensor5, + AVG(sensor_measurement_10) AS avg_sensor10 + FROM train_FD001 + GROUP BY unit_number + """, + "SELECT unit_number, MAX(time_in_cycles) AS last_cycle FROM train_FD002 GROUP BY unit_number", + "SELECT * FROM train_FD003 WHERE sensor_measurement_17 < 0 OR sensor_measurement_18 < 0", + """ + SELECT unit_number, + STDDEV(sensor_measurement_1) AS std_sensor1, + STDDEV(sensor_measurement_2) AS std_sensor2, + STDDEV(sensor_measurement_3) AS std_sensor3 + FROM train_FD001 + WHERE unit_number = 2 + GROUP BY unit_number + """, + """ + SELECT unit_number, + SUM(sensor_measurement_1) AS sum_sensor1, + SUM(sensor_measurement_2) AS sum_sensor2, + SUM(sensor_measurement_3) AS sum_sensor3, + SUM(sensor_measurement_4) AS sum_sensor4, + SUM(sensor_measurement_5) AS sum_sensor5 + FROM train_FD004 + GROUP BY unit_number + """, + "SELECT * FROM test_FD002 WHERE sensor_measurement_2 > 100", + """ + SELECT unit_number, + MAX(sensor_measurement_3) AS max_sensor3, + MIN(sensor_measurement_6) AS min_sensor6, + AVG(sensor_measurement_9) AS avg_sensor9 + FROM test_FD003 + GROUP BY unit_number + """, + "SELECT * FROM test_FD004 WHERE unit_number IN (1, 2, 3) ORDER BY time_in_cycles ASC", + ] + + for query in tqdm(queries, desc="Training NIMVanna"): + vn.train(sql=query) + + # Additional specific training cases + vn.train(question="Retrieve the time_in_cycles and operational_setting_1 from the test_FD001 for all records where the unit_id is equal to 1.", + sql="SELECT time_in_cycles, operational_setting_1 FROM test_FD001 WHERE unit_number = 1") + vn.train(question="Retrieve the time_in_cycles and sensor_measurement_1 from the test_FD001 for all records where the unit_id is equal to 1.", + sql="SELECT time_in_cycles, sensor_measurement_1 FROM test_FD001 WHERE unit_number = 1") + vn.train(question="Retrieve RUL of each unit from the train_FD001", + sql="SELECT unit_number, MAX(time_in_cycles) AS RUL FROM train_FD001 GROUP BY unit_number") + vn.train(question="Retrieve the unit_number, time_in_cycles, real time Remaining Useful Life (RUL), and sensor_measurement_3 from table train_FD001, ordered by unit_number and time_in_cycles.", sql="SELECT unit_number, time_in_cycles, MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles AS RUL, sensor_measurement_3 FROM train_FD001 ORDER BY unit_number, time_in_cycles") +