diff --git a/02-samples/11-customer-support-solar-panel/.env.example b/02-samples/11-customer-support-solar-panel/.env.example new file mode 100644 index 00000000..b7de3685 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/.env.example @@ -0,0 +1,18 @@ +# JIRA Integration Configuration +# Rename this file to .env and update with your own credentials + +# Your Atlassian JIRA URL (ending with a slash) +JIRA_INSTANCE_URL="https://your-domain.atlassian.net/" + +# Your JIRA account email +JIRA_USERNAME="your-email@example.com" + +# API token generated from your Atlassian account +# Generate at: https://id.atlassian.com/manage-profile/security/api-tokens +JIRA_API_TOKEN="your-api-token-here" + +# Set to True if using Jira Cloud +JIRA_CLOUD="True" + +# The name of your JIRA project for tickets +PROJECT_NAME="SOLAR" \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/01_solar_panel_customer_support_setup.ipynb b/02-samples/11-customer-support-solar-panel/01_solar_panel_customer_support_setup.ipynb new file mode 100644 index 00000000..1a340d4b --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/01_solar_panel_customer_support_setup.ipynb @@ -0,0 +1,651 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solar Panel Customer Support - Setup\n", + "\n", + "In this notebook, we'll set up Amazon resources for our Solar Panel Customer Support Agent. We'll be:\n", + "\n", + "1. Creating [Amazon Bedrock Knowledge Base](https://aws.amazon.com/bedrock/knowledge-bases/) for solar panel installation and maintenance information from text files.\n", + "2. Creating [Amazon Bedrock Guardrails](https://aws.amazon.com/bedrock/guardrails/) to safeguard customer interactions.\n", + "3. Creating [Amazon DynamoDb](https://aws.amazon.com/dynamodb/) for storing customer profiles and data.\n", + " \n", + "\n", + "These components will later be integrated into our Solar Panel Customer Support agent." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Import Required Libraries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Install prerequisites\n", + "!pip install --upgrade -q boto3\n", + "%pip install -qr requirements.txt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import time\n", + "import boto3\n", + "import logging\n", + "import pprint\n", + "import json\n", + "import uuid\n", + "import requests\n", + "\n", + "# Set up logging\n", + "logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)\n", + "logger = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up AWS Clients\n", + "Please set up your aws credentials in your enviroment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Set up AWS clients\n", + "session = boto3.Session()\n", + "region = session.region_name\n", + "sts_client = session.client('sts')\n", + "s3_client = session.client('s3')\n", + "bedrock_client = session.client('bedrock')\n", + "account_id = sts_client.get_caller_identity()[\"Account\"]\n", + "account_id_suffix = [:3]\n", + "#suffix = f\"{region}-{account_id_suffix}\"\n", + "bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')\n", + "bedrock_runtime = boto3.client('bedrock-runtime')\n", + "\n", + "# Display region and account information\n", + "print(f\"Region: {region}\")\n", + "#print(f\"Account ID: {account_id}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get the current timestamp\n", + "current_time = time.time()\n", + "# Format the timestamp as a string\n", + "timestamp_str = time.strftime(\"%Y%m%d%H%M%S\", time.localtime(current_time))[-7:]\n", + "# Create the suffix using the timestamp\n", + "suffix = f\"{timestamp_str}\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Amazon bedrock Knowledge Base for Solar Panel Manuals\n", + "\n", + "Download Amazon Bedrock Knowledge Bases helper" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "url = \"https://raw.githubusercontent.com/aws-samples/amazon-bedrock-samples/main/rag/knowledge-bases/features-examples/utils/knowledge_base.py\"\n", + "target_path = \"utils/knowledge_base.py\"\n", + "response = requests.get(url)\n", + "with open(target_path, \"w\") as f:\n", + " f.write(response.text)\n", + "print(f\"Downloaded Knowledge Bases utils to {target_path}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Amazon Bedrock Knowledge Base\n", + "In this section we will configure the Amazon Bedrock Knowledge Base containing the solar panel manuals for installation and maintainence. We will be using Amazon Opensearch Serverless Service as the underlying vector store and Amazon S3 as the data source containing the files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from utils.knowledge_base import BedrockKnowledgeBase\n", + "\n", + "knowledge_base_name = f\"solar-panel-manuals-knowledge-base-{suffix}\"\n", + "knowledge_base_description = \"Solar Panels Customer support Manuals.\"\n", + "foundation_model = \"anthropic.claude-3-sonnet-20240229-v1:0\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "For this notebook, we'll create a Knowledge Base with an Amazon S3 data source." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data_bucket_name = f'solar-panel-support-agent-{suffix}-bucket' # replace it with your first bucket name.\n", + "data_sources=[{\"type\": \"S3\", \"bucket_name\": data_bucket_name}]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the Amazon S3 bucket and upload the sample documents" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import botocore\n", + "import os\n", + "\n", + "def create_s3_bucket(bucket_name, region=None):\n", + " s3 = boto3.client('s3', region_name=region)\n", + "\n", + " try:\n", + " if region is None or region == 'us-east-1':\n", + " s3.create_bucket(Bucket=bucket_name)\n", + " else:\n", + " s3.create_bucket(\n", + " Bucket=bucket_name,\n", + " CreateBucketConfiguration={'LocationConstraint': region}\n", + " )\n", + " print(f\"βœ… Bucket '{bucket_name}' created successfully.\")\n", + " except botocore.exceptions.ClientError as e:\n", + " print(f\"❌ Failed to create bucket: {e.response['Error']['Message']}\")\n", + "\n", + "create_s3_bucket(data_bucket_name, region)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def upload_directory(path, bucket_name):\n", + " for root,dirs,files in os.walk(path):\n", + " for file in files:\n", + " file_to_upload = os.path.join(root,file)\n", + " print(f\"uploading file {file_to_upload} to {bucket_name}\")\n", + " s3_client.upload_file(file_to_upload,bucket_name,file)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "upload_directory(\"./data\", data_bucket_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the Knowledge Base\n", + "We are now going to create the Knowledge Base using the abstraction located in the helper function we previously downloaded." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "knowledge_base = BedrockKnowledgeBase(\n", + " kb_name=f'{knowledge_base_name}',\n", + " kb_description=knowledge_base_description,\n", + " data_sources=data_sources,\n", + " chunking_strategy = \"FIXED_SIZE\", \n", + " suffix = f'{suffix}-f'\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Start ingestion job\n", + "Once the KB and data source created, we can start the ingestion job for the data source. During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# ensure that the kb is available\n", + "time.sleep(30)\n", + "# sync knowledge base\n", + "knowledge_base.start_ingestion_job()\n", + "# keep the kb_id for invocation later in the invoke request\n", + "kb_id = knowledge_base.get_knowledge_base_id()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test the Knowledge Base\n", + "We can now test the Knowledge Base to verify the documents have been ingested properly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"How to install sunpower X?\"\n", + "foundation_model = \"anthropic.claude-3-5-sonnet-20240620-v1:0\"\n", + "\n", + "response = bedrock_agent_runtime_client.retrieve_and_generate(\n", + " input={\n", + " \"text\": query\n", + " },\n", + " retrieveAndGenerateConfiguration={\n", + " \"type\": \"KNOWLEDGE_BASE\",\n", + " \"knowledgeBaseConfiguration\": {\n", + " 'knowledgeBaseId': kb_id,\n", + " \"modelArn\": \"arn:aws:bedrock:{}::foundation-model/{}\".format(region, foundation_model),\n", + " \"retrievalConfiguration\": {\n", + " \"vectorSearchConfiguration\": {\n", + " \"numberOfResults\":5\n", + " } \n", + " }\n", + " }\n", + " }\n", + ")\n", + "\n", + "print(response['output']['text'],end='\\n'*2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "os.environ['AWS_REGION'] = region\n", + "os.environ['KNOWLEDGE_BASE_ID'] = kb_id\n", + "os.environ['MIN_SCORE'] = \"0.1\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Amazon Bedrock Guardrail\n", + "\n", + "Now we'll create a guardrail for our Solar Panel Customer Support agent to ensure safe and appropriate responses." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "guardrail_name=f\"customer-support-{suffix}\"\n", + "guardrail_description=\"This guardrail is for a Solar Panel Customer support agent to avoid unsafe, violations or investment conversations\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " # Check if guardrail already exists\n", + " guardrail_exists = False\n", + " existing_guardrail_id = None\n", + " \n", + " response = bedrock_client.list_guardrails(maxResults=100)\n", + " for guardrail in response.get('guardrails', []):\n", + " if guardrail.get('name') == guardrail_name:\n", + " guardrail_exists = True\n", + " existing_guardrail_id = guardrail.get(\"id\")\n", + " break\n", + " \n", + " # If guardrail exists, delete it to recreate\n", + " if guardrail_exists:\n", + " print(f\"Guardrail {guardrail_name} already exists, deleting it to recreate...\")\n", + " bedrock_client.delete_guardrail(guardrailIdentifier=existing_guardrail_id)\n", + " time.sleep(30) # Wait for deletion to complete\n", + " \n", + " # Create new guardrail\n", + " print(f\"Creating guardrail {guardrail_name}...\")\n", + " response = bedrock_client.create_guardrail(\n", + " name='customer-support-guardrail',\n", + " description='Prevents the model from providing fiduciary advice.',\n", + " topicPolicyConfig={\n", + " 'topicsConfig': [\n", + " {\n", + " 'name': 'Fiduciary Advice',\n", + " 'definition': 'Providing personalized advice or recommendations on managing financial assets, investments, or trusts in a fiduciary capacity or assuming related obligations and liabilities.',\n", + " 'examples': [\n", + " 'What stocks should I invest in for my retirement?',\n", + " 'Is it a good idea to put my money in a mutual fund?',\n", + " 'How should I allocate my 401(k) investments?',\n", + " 'What type of trust fund should I set up for my children?',\n", + " 'Should I hire a financial advisor to manage my investments?'\n", + " ],\n", + " 'type': 'DENY'\n", + " }\n", + " ]\n", + " },\n", + " contentPolicyConfig={\n", + " 'filtersConfig': [\n", + " {\n", + " 'type': 'SEXUAL',\n", + " 'inputStrength': 'HIGH',\n", + " 'outputStrength': 'HIGH'\n", + " },\n", + " {\n", + " 'type': 'VIOLENCE',\n", + " 'inputStrength': 'HIGH',\n", + " 'outputStrength': 'HIGH'\n", + " },\n", + " {\n", + " 'type': 'HATE',\n", + " 'inputStrength': 'HIGH',\n", + " 'outputStrength': 'HIGH'\n", + " },\n", + " {\n", + " 'type': 'INSULTS',\n", + " 'inputStrength': 'HIGH',\n", + " 'outputStrength': 'HIGH'\n", + " },\n", + " {\n", + " 'type': 'MISCONDUCT',\n", + " 'inputStrength': 'HIGH',\n", + " 'outputStrength': 'HIGH'\n", + " },\n", + " {\n", + " 'type': 'PROMPT_ATTACK',\n", + " 'inputStrength': 'HIGH',\n", + " 'outputStrength': 'NONE'\n", + " }\n", + " ]\n", + " },\n", + " wordPolicyConfig={\n", + " 'wordsConfig': [\n", + " {'text': 'fiduciary advice'},\n", + " {'text': 'investment recommendations'},\n", + " {'text': 'stock picks'},\n", + " {'text': 'financial planning guidance'},\n", + " {'text': 'portfolio allocation advice'},\n", + " {'text': 'retirement fund suggestions'},\n", + " {'text': 'wealth management tips'},\n", + " {'text': 'trust fund setup'},\n", + " {'text': 'investment strategy'},\n", + " {'text': 'financial advisor recommendations'}\n", + " ],\n", + " 'managedWordListsConfig': [\n", + " {\n", + " 'type': 'PROFANITY'\n", + " }\n", + " ]\n", + " },\n", + " blockedInputMessaging='I apologize, but I am not able to provide an answer to that question.',\n", + " blockedOutputsMessaging='I apologize, but I am not able to provide an answer to that question.',\n", + ")\n", + " \n", + " guardrail_id = response['guardrailId']\n", + " guardrail_version = response['version']\n", + " \n", + " print(f\"Successfully created guardrail with ID {guardrail_id} and version {guardrail_version}\")\n", + " \n", + "except Exception as e:\n", + " print(f\"Error creating guardrail: {e}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Test the Guardrail\n", + "\n", + "Let's test our guardrail to ensure it's working properly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test function to check if input/output is blocked by guardrail\n", + "def test_guardrail(text, source_type='INPUT'):\n", + " response = bedrock_runtime.apply_guardrail(\n", + " guardrailIdentifier=guardrail_id,\n", + " guardrailVersion=guardrail_version,\n", + " source=source_type, # can be 'INPUT' or 'OUTPUT'\n", + " content=[{\"text\": {\"text\": text}}]\n", + " )\n", + "\n", + " # New response format uses different fields\n", + " print(f\"Action: {response.get('action')}\")\n", + " print(f\"Action Reason: {response.get('actionReason', 'None')}\")\n", + "\n", + " # Check if content was blocked\n", + " is_blocked = response.get('action') == 'GUARDRAIL_INTERVENED'\n", + " print(f\"Content {source_type} blocked: {is_blocked}\")\n", + "\n", + " if is_blocked:\n", + " # Print topic policies that were triggered\n", + " assessments = response.get('assessments', [])\n", + " if assessments and 'topicPolicy' in assessments[0]:\n", + " print(\"Blocked topics:\", [topic.get('name') for topic in\n", + " assessments[0]['topicPolicy'].get('topics', [])\n", + " if topic.get('action') == 'BLOCKED'])\n", + "\n", + " # Print the modified output if available\n", + " if 'outputs' in response and response['outputs']:\n", + " print(\"Modified content:\", response['outputs'][0].get('text', 'None'))\n", + "\n", + " return response\n", + "\n", + "\n", + "# Test input that should be blocked\n", + "print(\"\\nTesting input that should be blocked:\")\n", + "test_guardrail(\"What stocks should I invest in for my retirement?\")\n", + "\n", + "\n", + "# Test input that should be not be blocked\n", + "print(\"\\nTesting input that should be blocked:\")\n", + "test_guardrail(\"How do I maintain my Sunpower X solar panel?\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Save Configuration for Agent Setup\n", + "\n", + "Save the configuration details to be used later when setting up the Solar Panel Customer Support agent." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create configuration dictionary\n", + "config = {\n", + " \"guardrail\": {\n", + " \"id\": guardrail_id,\n", + " \"version\": guardrail_version,\n", + " \"name\": guardrail_name,\n", + " },\n", + " \"knowledge_base\": {\n", + " \"id\": kb_id,\n", + " \"name\": knowledge_base_name,\n", + " },\n", + " \"region\": region\n", + "}\n", + "\n", + "# Save configuration to file\n", + "config_path = \"solar_panel_support_config.json\"\n", + "with open(config_path, 'w') as f:\n", + " json.dump(config, f, indent=2)\n", + " \n", + "print(f\"Configuration saved to {config_path}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up Amazon DynamoDB for Customer Profiles\n", + "\n", + "In this section, we'll set up a Amazon DynamoDB table to store customer profile information. This will allow our Solar Panel Customer Support agent to retrieve and update customer information during support conversations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append('utils')\n", + "\n", + "# Now let's set up the DynamoDB table for customer profiles\n", + "# This uses the script we created in utils/customer_dynamodb.py\n", + "from customer_dynamodb import SolarCustomerDynamoDB\n", + "\n", + "# Initialize the DynamoDB helper\n", + "db = SolarCustomerDynamoDB()\n", + "\n", + "# Check if the table already exists both in SSM and in DynamoDB\n", + "existing_table_name = db.get_table_name_from_ssm()\n", + "table_exists = False\n", + "if existing_table_name:\n", + " # Verify the table actually exists in DynamoDB\n", + " table_exists = db.table_exists(existing_table_name)\n", + " \n", + "if table_exists:\n", + " print(f\"βœ… Customer profile table '{existing_table_name}' already exists.\")\n", + "else:\n", + " # Table doesn't exist yet, create it and populate with data\n", + " # If we found a parameter but table doesn't exist, we'll create a new one\n", + " table_name = f\"SolarCustomerProfiles-{suffix}\"\n", + " print(f\"Creating table '{table_name}'...\")\n", + " \n", + " # Create the DynamoDB table\n", + " table = db.create_table(table_name)\n", + " \n", + " # Generate synthetic customer profiles directly in DynamoDB\n", + " print(\"Generating synthetic customer profiles...\")\n", + " customer_ids = db.generate_synthetic_profiles(count=10, table_name=table_name)\n", + " print(f\"βœ… Successfully created and populated table with {len(customer_ids)} customer profiles\")\n", + " \n", + " # Test by retrieving one profile\n", + " sample_customer = db.get_profile_by_id(customer_ids[0], table_name)\n", + " if sample_customer:\n", + " print(f\"Sample customer: {sample_customer['name']} from {sample_customer['country']}\")\n", + "\n", + "# Add the DynamoDB table name to our configuration\n", + "config[\"customer_table\"] = existing_table_name if table_exists else table_name\n", + "with open(config_path, 'w') as f:\n", + " json.dump(config, f, indent=2)\n", + " \n", + "print(f\"Updated configuration with customer table information\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "In this notebook, we have successfully:\n", + "\n", + "1. Created a Knowledge Base for solar panel installation and maintenance information\n", + "2. Uploaded solar panel documentation to the Knowledge Base\n", + "3. Created a Guardrail to ensure safe and appropriate responses\n", + "4. Tested both the Knowledge Base and Guardrail to confirm they're working properly\n", + "5. Saved the configuration for future use when setting up the Solar Panel Customer Support agent\n", + "\n", + "The next steps would be to create the Solar Panel Customer Support agent itself, which will utilize this Amazon Bedrock Knowledge Base, Guardrail and DynamoDB Database." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we have set up all the components needed for our Solar Panel Customer Support agent:\n", + "\n", + "1. Amazon Bedrock Knowledge Base - Contains solar panel manuals and documentation\n", + "2. Amazon Bedrock Guardrail - Safeguards and maintains appropriate responses and content filtering\n", + "3. Amazon DynamoDB Table - Stores customer profile information\n", + "\n", + "These resources are ready to be used by our agent implementation." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/02-samples/11-customer-support-solar-panel/02_Customer_support_OTEL_mem0_JIRA.ipynb b/02-samples/11-customer-support-solar-panel/02_Customer_support_OTEL_mem0_JIRA.ipynb new file mode 100644 index 00000000..7c365672 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/02_Customer_support_OTEL_mem0_JIRA.ipynb @@ -0,0 +1,1113 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solar Panels Customer Support Agent \n", + "\n", + "This notebook implements an enhanced example of customer support agent for solar panel products that utilizes these key features:\n", + "\n", + "1. **Amazon Bedrock Knowledge Base**: Provides information about solar panel models, installation, and maintenance through Retrieval Augemented Generation\n", + "2. **Amazon Bedrock Guardrails**: Safeguards the Strands Agent and helps adhere to content policies\n", + "3. **Amazon DynamoDB Customer Profiles**: Stores and retrieves customer information from Amazon DynamoDB\n", + "4. **JIRA Integration**: Creates and manages support tickets in JIRA\n", + "5. **Mem0 Memory**: Provides long-term conversation memory for personalized interactions across sessions.\n", + "6. **Custom Support Tools**: Specialized tools for solar system performance analysis and warranty checks\n", + "7. **Observability with Langfuse**: Strands Agents built-in support for observability with LangFuse\n", + "\n", + "The Strands Agent will identify the customer via their email address, retrieve their profile information from Amazon DynamoDB, create support tickets in JIRA when needed, and provide personalized support based on conversation history stored in Mem0." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Agent Details\n", + "
\n", + " \n", + "|Feature |Description |\n", + "|--------------------|---------------------------------------------------|\n", + "|Native tools used |retrieve, mem0_memory |\n", + "|Custom tools JIRA |create_solar_support_ticket, get_customer_tickets |\n", + "|Custom tools Profile Management |get_customer_profile, update_customer_profile|\n", + "|Custom tools Solar Panels |analyze_solar_system_performance,check_warranty_status |\n", + "|Agent Structure |Single agent architecture |\n", + "|AWS services used |Amazon Bedrock Knowledge Base, Amazon DynamoDB, Amazon Bedrock Guardrails|\n", + "|Integrations | Mem0, Langfuse, Atlassian JIRA |\n", + "\n", + "
" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, + "source": [ + "## Architecture\n", + "\n", + "
\n", + " \n", + "
\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "- Python 3.10+\n", + "- Anthropic Claude 3.7 enabled on Amazon Bedrock\n", + "- solar_panel_support_config.json populated with resources from 01_solar_panel_customer_support_setup.ipynb notebook.\n", + "\n", + "| Component | Description |\n", + "|-----------|-------------|\n", + "| AWS Account | With access to Amazon Bedrock and Amazon DynamoDB |\n", + "| Mem0 API Key | For conversation memory storage (m0-*) |\n", + "| JIRA Account Details | For ticket creation and management |\n", + "| Langfuse Account Keys| Public Key and Secret key for Observability |\n", + "| Python 3.10+ | With required packages in requirements.txt |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install Required Dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Install prerequisites\n", + "\n", + "!pip install -r requirements.txt\n", + "#!pip install --upgrade strands-agents strands-agents-tools\n", + "#!pip install --upgrade -q boto3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Import Libraries and Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import time\n", + "import re\n", + "import json\n", + "import uuid\n", + "import boto3\n", + "import logging\n", + "import warnings\n", + "from pathlib import Path\n", + "import datetime\n", + "import base64\n", + "from strands import Agent, tool\n", + "from strands.models import BedrockModel\n", + "from strands_tools import retrieve, mem0_memory\n", + "from langfuse import Langfuse\n", + "from dotenv import load_dotenv\n", + "\n", + "\n", + "# Import our custom tools for customer profile management with DynamoDB\n", + "from customer_profile_tools_dynamodb import get_customer_profile, update_customer_profile\n", + "from customer_profile_tools_dynamodb import analyze_solar_system_performance, check_warranty_status\n", + "# Import JIRA tools\n", + "from utils.jira_tools import create_solar_support_ticket, get_customer_tickets\n", + "from utils.customer_dynamodb import SolarCustomerDynamoDB\n", + "\n", + "# Silence deprecation warnings\n", + "warnings.filterwarnings(\"ignore\", message=\"output_format=.*\")\n", + "warnings.filterwarnings(\"ignore\", category=DeprecationWarning)\n", + "\n", + "# Set up logging\n", + "logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Load Configuration\n", + "\n", + "We'll load the Amazon Knowledge Base ID, Guardrail ID and Amazon DynamoDB ID from our configuration file that was created during setup notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load configuration from file\n", + "config_path = \"solar_panel_support_config.json\"\n", + "\n", + "with open(config_path, 'r') as f:\n", + " config = json.load(f)\n", + " \n", + "print(f\"Loaded configuration from {config_path}\")\n", + "\n", + "# Set up environment variables and configuration\n", + "GUARDRAIL_ID = config.get(\"guardrail\", {}).get(\"id\")\n", + "KB_ID = config.get(\"knowledge_base\", {}).get(\"id\")\n", + "REGION = config.get(\"region\", \"us-east-1\")\n", + "CUSTOMER_TABLE = config.get(\"customer_table\")\n", + "\n", + "# Set environment variables for tools\n", + "os.environ[\"KNOWLEDGE_BASE_ID\"] = KB_ID\n", + "os.environ[\"AWS_REGION\"] = REGION\n", + "\n", + "print(f\"Guardrail ID: {GUARDRAIL_ID}\")\n", + "print(f\"Knowledge Base ID: {KB_ID}\")\n", + "print(f\"Region: {REGION}\")\n", + "print(f\"Customer Table: {CUSTOMER_TABLE}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Display Available Test Customers\n", + "\n", + "Let's list the available test customers from the Amazon DynamoDB table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Query the table to get all customer profiles\n", + "dynamodb = boto3.resource('dynamodb', region_name=REGION)\n", + "table = dynamodb.Table(CUSTOMER_TABLE)\n", + "response = table.scan(Limit=10) # Get up to 10 profiles\n", + "\n", + "# Display available test customers\n", + "customer_list = response.get('Items', [])\n", + "\n", + "print(\"Available test customers:\")\n", + "for i, profile in enumerate(customer_list):\n", + " print(f\"- {profile.get('name')}: ID={profile.get('customer_id')}, Email={profile.get('email')}\")\n", + " \n", + "# Show details of first customer\n", + "if customer_list:\n", + " sample = customer_list[0]\n", + " print(f\"\\nSample profile details for {sample.get('name')}:\")\n", + " print(f\"Customer ID: {sample.get('customer_id')}\")\n", + " print(f\"Email: {sample.get('email')}\")\n", + " print(f\"Country: {sample.get('country')}, State: {sample.get('state')}\")\n", + " print(f\"Purchase history: {len(sample.get('purchase_history', []))} items\")\n", + "\n", + "# Initialize the DynamoDB client\n", + "db = SolarCustomerDynamoDB(region_name=REGION)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Define System Prompt\n", + "\n", + "We'll create a comprehensive system prompt that guides the agent on how to interact with customers using all the integrated tools." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# System prompt for the solar panel agent with memory, DynamoDB and JIRA integration\n", + "system_prompt = \"\"\"\n", + "You are a helpful solar panel customer support assistant with access to customer profiles in DynamoDB, \n", + "JIRA ticketing system, and memory of past conversations using mem0.\n", + "\n", + "When interacting with customers:\n", + "1. First identify the customer using their email address (preferred) or customer ID\n", + "2. Access past conversation memory using mem0_memory to provide personalized continuity\n", + "3. Remember important details shared by the customer in previous conversations\n", + "\n", + "Use the knowledge base to answer questions about solar panels, including installation, maintenance, pricing, \n", + "efficiency, and technical specifications. When a customer asks about their specific solar panels or previous \n", + "issues, refer to:\n", + "- Their purchase history (available in the customer profile under the 'purchase_history' field)\n", + "- Previous conversations stored in memory\n", + "- JIRA tickets if they exist\n", + "\n", + "Always personalize your responses based on all available information. Use the customer's name, mention \n", + "specific products they own, reference their past issues, and acknowledge previous conversations when appropriate.\n", + "Do not answer about other customers. Customer X cannot access customer Y data: respond with - This request is denied for privacy.\n", + "\n", + "You have specialized tools available to help customers:\n", + "\n", + "For Memory Management:\n", + "1. mem0_memory: To store and retrieve conversation history\n", + "\n", + "For DynamoDB Access:\n", + "2. get_customer_profile: To retrieve customer details (including their purchase history) using email or ID\n", + "3. update_customer_profile: To update customer information\n", + "\n", + "For Solar Analysis:\n", + "4. analyze_solar_system_performance: To analyze a customer's solar system performance\n", + "5. check_warranty_status: To check warranty status for customer products\n", + "\n", + "For JIRA Ticketing:\n", + "6. create_solar_support_ticket: To create new support tickets in JIRA\n", + " - When creating tickets, include the customer_email parameter when available\n", + " - Always include the customer ID for proper tracking\n", + "7. get_customer_tickets: To retrieve customer tickets from JIRA\n", + "\n", + "\n", + "For Knowledge Base Access:\n", + "8. retrieve: To search the knowledge base for solar panel information\n", + "\n", + "Create JIRA tickets when customers report issues that cannot be immediately resolved. Tickets should include:\n", + "- Descriptive title\n", + "- Problem description\n", + "- Customer ID\n", + "- Relevant product information\n", + "\n", + "Store important information from each conversation in mem0_memory so you can refer to it in future interactions.\n", + "Always retrieve relevant memories at the beginning of the conversation to provide continuity.\n", + "\n", + "Always be polite, professional, helpful, and empathetic. Provide clear and concise information.\n", + "\"\"\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Bedrock Model with Guardrails\n", + "\n", + "We'll create a Bedrock model that uses Claude 3.5 Sonnet with the specified guardrail." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a Bedrock model with guardrail configuration\n", + "bedrock_model = BedrockModel(\n", + " model_id=\"us.anthropic.claude-3-7-sonnet-20250219-v1:0\",\n", + " guardrail_id=GUARDRAIL_ID, # Bedrock guardrail ID\n", + " guardrail_version=\"DRAFT\", # Guardrail version\n", + " guardrail_trace=\"enabled\", # Enable trace info for debugging\n", + " boto_session=boto3.Session(region_name=REGION),\n", + " guardrail_redact_output=True, # Redact sensitive data in output\n", + " guardrail_redact_input=True # Redact sensitive data in input\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Integration Setup\n", + "\n", + "This section covers the setup for our three key integrations:\n", + "1. OpenTelemetry (OTEL) for observability with Langfuse\n", + "2. Mem0 for conversation memory\n", + "3. JIRA for ticket management\n", + "\n", + "Each integration requires specific configuration and environment variables." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7.1 OpenTelemetry (OTEL) Setup\n", + "\n", + "OpenTelemetry provides standardized observability for our agent, allowing us to track interactions, performance, and errors. We'll use Langfuse as our OpenTelemetry endpoint for visualization and analysis.\n", + "\n", + "**Required Environment Variables:**\n", + "- `LANGFUSE_HOST`: The Langfuse API endpoint\n", + "- `OTEL_EXPORTER_OTLP_ENDPOINT`: Where to send OpenTelemetry data\n", + "- `OTEL_EXPORTER_OTLP_HEADERS`: Authentication for the endpoint\n", + "\n", + "**How OTEL Works with Strands:**\n", + "1. Each agent interaction generates spans and traces\n", + "2. The trace data is sent to Langfuse via OTLP (OpenTelemetry Protocol)\n", + "3. This enables analysis of agent performance, latency, and behavior" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Langfuse:** [Langfuse](https://langfuse.com/docs/integrations/strands-agents) is an open-source LLM engineering platform. It provides robust tracing, debugging, evaluation, and monitoring capabilities for AI agents and LLM applications.\n", + "\n", + "Please add your public key and secret key from API keys section in Langfuse settings in the below cell. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get keys for your project from the project settings page: https://cloud.langfuse.com\n", + "\n", + "os.environ[\"LANGFUSE_PUBLIC_KEY\"] = \"pk-your-api-key\"\n", + "os.environ[\"LANGFUSE_SECRET_KEY\"] = \"sk-your-api-key\" \n", + "#os.environ[\"LANGFUSE_HOST\"] = \"https://cloud.langfuse.com\" # πŸ‡ͺπŸ‡Ί EU region (default)\n", + "os.environ[\"LANGFUSE_HOST\"] = \"https://us.cloud.langfuse.com\" # πŸ‡ΊπŸ‡Έ US region\n", + " \n", + "# Build Basic Auth header.\n", + "LANGFUSE_AUTH = base64.b64encode(\n", + " f\"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}\".encode()\n", + ").decode()\n", + " \n", + "# Configure OpenTelemetry endpoint & headers\n", + "os.environ[\"OTEL_EXPORTER_OTLP_ENDPOINT\"] = os.environ.get(\"LANGFUSE_HOST\") + \"/api/public/otel/v1/traces\"\n", + "os.environ[\"OTEL_EXPORTER_OTLP_HEADERS\"] = f\"Authorization=Basic {LANGFUSE_AUTH}\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7.2 Mem0 Memory Integration\n", + "\n", + "[Mem0](https://mem0.ai/) provides long-term conversation memory for our agent, allowing it to recall previous interactions with users. This notebook uses [Strands-agents-tools](https://github.com/strands-agents/tools/tree/main/src/strands_tools) in-built [mem0-memory](https://github.com/strands-agents/tools/blob/main/src/strands_tools/mem0_memory.py) tool. \n", + "\n", + "**Required Environment Variables:**\n", + "- `MEM0_API_KEY`: Your Mem0 API key for authentication\n", + "\n", + "**How Mem0 Works with Strands:**\n", + "1. Important conversation details are stored in Mem0 using the `mem0_memory` tool\n", + "2. Each memory is associated with a specific user_id ( email)\n", + "3. The agent can retrieve past conversation memories to provide continuity" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "os.environ[\"MEM0_API_KEY\"] = \"your-api-key\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7.3 JIRA Integration Setup\n", + "\n", + "JIRA integration allows our agent to create and manage support tickets directly. This provides a seamless way to track customer issues that require follow-up. Create a project and update the name in the .env file.\n", + "\n", + "**Required Environment Variables (stored in .env file):**\n", + "- `JIRA_INSTANCE_URL`: Your JIRA instance URL (e.g., \"https://your-domain.atlassian.net/\")\n", + "- `JIRA_USERNAME`: Your JIRA account email\n", + "- `JIRA_API_TOKEN`: API token generated from your Atlassian account\n", + "- `JIRA_CLOUD`: \"True\" if using Jira Cloud (vs. Server)\n", + "- `PROJECT_NAME`: Name of your JIRA project for tickets (e.g., \"SOLAR\")\n", + "\n", + "**How Environment Variables Work:**\n", + "1. These variables are stored in a `.env` file in the project root directory\n", + "2. The `.env` file is loaded using `python-dotenv` package\n", + "3. This keeps sensitive credentials out of code and notebooks\n", + "4. Variables can be accessed via `os.getenv(\"VARIABLE_NAME\")`\n", + "\n", + "**Example .env.example file structure:**\n", + "```\n", + "JIRA_INSTANCE_URL=https://your-domain.atlassian.net/\n", + "JIRA_USERNAME=your-email@example.com\n", + "JIRA_API_TOKEN=your-api-token\n", + "JIRA_CLOUD=True\n", + "PROJECT_NAME=SOLAR\n", + "```\n", + "\n", + "**How JIRA Integration Works:**\n", + "1. The agent uses the `create_solar_support_ticket` tool to create tickets\n", + "2. The `get_customer_tickets` tool retrieves existing tickets for a customer\n", + "3. Each ticket includes customer details and issue description\n", + "4. The JIRA API handles the actual ticket creation and management" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Load environment variables from .env file\n", + "load_dotenv()\n", + "\n", + "# Print JIRA configuration (masking sensitive data)\n", + "print(\"\\n===== JIRA Configuration =====\\n\")\n", + "print(f\"JIRA URL: {os.getenv('JIRA_INSTANCE_URL')}\")\n", + "print(f\"JIRA USERNAME: {os.getenv('JIRA_USERNAME')}\")\n", + "print(f\"JIRA API TOKEN: {'*****' if os.getenv('JIRA_API_TOKEN') else 'Not set'}\")\n", + "print(f\"JIRA CLOUD: {os.getenv('JIRA_CLOUD')}\")\n", + "print(f\"PROJECT NAME: {os.getenv('PROJECT_NAME', 'SOLAR')}\")\n", + "\n", + "# Note: If you're seeing \"None\" for these values, make sure your .env file is:\n", + "# 1. Created in the project root directory\n", + "# 2. Contains the correct variable names and values" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Create the Support Agent\n", + "\n", + "Now we'll create the Strands Customer Support Agent with all the integrated tools." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the agent with all integrated tools\n", + "support_agent = Agent(\n", + " system_prompt=system_prompt,\n", + " model=bedrock_model,\n", + " callback_handler=None,\n", + " tools=[\n", + " # Memory tools\n", + " mem0_memory,\n", + " # DynamoDB customer profile tools\n", + " get_customer_profile,\n", + " update_customer_profile,\n", + " \n", + " # Solar analysis tools\n", + " analyze_solar_system_performance,\n", + " check_warranty_status,\n", + " \n", + " # JIRA integration tools\n", + " create_solar_support_ticket,\n", + " get_customer_tickets,\n", + " \n", + " # Knowledge base and web search\n", + " retrieve\n", + " \n", + " ],\n", + " trace_attributes={\n", + " \"session.id\": \"abc-1234\", # Example session ID\n", + " \"user.id\": \"user-email-example@domain.com\", # Example user ID\n", + " \"langfuse.tags\": [\n", + " \"dev\",\n", + " \"Strands-Project-Demo\",\n", + " \"Observability-Tutorial\",\n", + " ]\n", + " }\n", + " \n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define a test customer email - use one from the list displayed earlier\n", + "test_customer_email = \"customer1@example.com\"\n", + "\n", + "# Define a function to test the agent with a specific question\n", + "def test_agent_question(question):\n", + " \"\"\"Test the agent with a single question.\"\"\"\n", + " print(f\"\\nπŸ“ QUESTION: {question}\\n\")\n", + " \n", + " # Build context with the customer email\n", + " context = f\"The customer's email is {test_customer_email}. \"\n", + " full_question = context + \"\\n\\n\" + question\n", + " \n", + " # Start timing for performance tracking\n", + " start_time = time.time()\n", + " \n", + " try:\n", + " # Get response from the agent\n", + " response = support_agent(full_question)\n", + " \n", + " # Calculate response time\n", + " response_time = time.time() - start_time\n", + " print(f\"\\nπŸ€– RESPONSE:\\n{response}\")\n", + " print(f\"\\n⏱️ Response generated in {response_time:.2f} seconds\\n\")\n", + " print(\"=\" * 80)\n", + " #return response\n", + " except Exception as e:\n", + " print(f\"\\n❌ Error: {str(e)}\")\n", + " print(\"The model may have encountered an issue processing your request.\")\n", + " return None" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Test the Agent tools without the memory capability \n", + "### Test 1: Customer Profile Question\n", + "Test retrieving customer profile information" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Test agent retrieving customer profile information\n", + "test_agent_question(\"What solar panels do I have installed? When did I purchase them?\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test 2: Warranty Status Check\n", + "Test checking warranty status for customer's solar panels" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test checking warranty status\n", + "test_agent_question(\"Is my solar panel still under warranty? How long do I have left on the warranty?\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test 3: System Performance Analysis\n", + "\n", + "Test analyzing the solar system's performance" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test performance analysis\n", + "test_agent_question(\"How is my solar system performing? Is it producing the expected amount of energy?\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test 4: Knowledge Base Query\n", + "Test retrieving information from the solar panel knowledge base" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test knowledge base retrieval\n", + "test_agent_question(\"How do I clean my SunPower X solar panels properly? What cleaning products should I use?\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Demonstrating how to store memory using the mem0_memory personalised response." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# First, let's store a memory about the customer's preference\n", + "memory_content = \"The customer mentioned they prefer to do maintenance on weekends and is interested in upgrading their inverter next year.\"\n", + "\n", + "# Store the memory\n", + "try:\n", + " result = support_agent.tool.mem0_memory(\n", + " action=\"store\", \n", + " content=memory_content, \n", + " user_id=test_customer_email,\n", + " metadata={\"timestamp\": datetime.datetime.now().isoformat()}\n", + " )\n", + " print(\"βœ… Memory stored successfully\")\n", + "except Exception as e:\n", + " print(f\"❌ Error storing memory: {e}\")\n", + " \n", + "# Now let's test if the agent retrieves and uses this memory in its response\n", + "test_agent_question(\"When would be a good time to schedule my annual maintenance check?\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 9. Helper Functions\n", + "\n", + "These functions will help manage customer interactions and handle memory storage and retrieval." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define common regex patterns once\n", + "EMAIL_REGEX = r'\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b'\n", + "\n", + "# Function to retrieve memories for a customer\n", + "def retrieve_customer_memories(customer_email):\n", + " \"\"\"Retrieve conversation memories for a customer using the mem0_memory tool.\"\"\"\n", + " try:\n", + " # Use the agent to call the mem0_memory tool\n", + " results = support_agent.tool.mem0_memory(action=\"list\", user_id=customer_email)\n", + " \n", + " # Check if we have any memories\n", + " if isinstance(results, dict) and results.get('status') == 'error':\n", + " print(\"No previous memories found. Starting fresh conversation.\")\n", + " return []\n", + " \n", + " # Format the results\n", + " if isinstance(results, list):\n", + " return results\n", + " elif isinstance(results, dict) and 'results' in results:\n", + " return results['results']\n", + " else:\n", + " return []\n", + " except Exception as e:\n", + " print(f\"Error retrieving memories: {str(e)}\")\n", + " return []\n", + "\n", + "# Function to store a conversation in memory\n", + "def store_conversation_memory(customer_email, query, response):\n", + " \"\"\"Store a conversation interaction in mem0.\"\"\"\n", + " try:\n", + " # Format the conversation entry\n", + " memory_entry = f\"User: {query}\\nAssistant: {response}\"\n", + " \n", + " # Add to mem0 using the agent's tool\n", + " result = support_agent.tool.mem0_memory(\n", + " action=\"store\", \n", + " content=memory_entry, \n", + " user_id=customer_email,\n", + " metadata={\"timestamp\": datetime.datetime.now().isoformat()}\n", + " )\n", + " print(\"βœ“ Conversation stored in memory\")\n", + " return True\n", + " except Exception as e:\n", + " print(f\"Error storing memory: {str(e)}\")\n", + " return False\n", + "\n", + "# Function to ask a question to the support agent\n", + "def ask_support_question(question, customer_email):\n", + " \"\"\"Ask the support agent a question with customer context detection.\n", + "\n", + " Args:\n", + " question (str): The customer support question\n", + " customer_email (str): The customer's email address\n", + "\n", + " Returns:\n", + " str: Response from the agent\n", + " \"\"\"\n", + " # Build context message to include with the question\n", + " context = f\"The customer's email is {customer_email}. \"\n", + " \n", + " # Get customer memories and add to context\n", + " memories = retrieve_customer_memories(customer_email)\n", + " if memories:\n", + " memory_context = \"\\n\\nHere are relevant past conversations with this customer:\\n\"\n", + " \n", + " # Include only the most recent 3 memories for context\n", + " for i, memory in enumerate(memories[:3]):\n", + " memory_content = memory.get('memory', '')\n", + " memory_context += f\"Memory {i+1}: {memory_content}\\n\\n\"\n", + " \n", + " context += memory_context\n", + " \n", + " # Combine context and question\n", + " full_question = context + \"\\n\\n\" + question\n", + " \n", + " # Display processing message\n", + " print(\"\\nProcessing your question...\\n\")\n", + " \n", + " # Start timing for performance tracking\n", + " start_time = time.time()\n", + " \n", + " # Get response from the agent\n", + " try:\n", + " response = support_agent(full_question)\n", + " \n", + " # Store this conversation in memory\n", + " store_conversation_memory(customer_email, question, response)\n", + " \n", + " # Calculate response time\n", + " response_time = time.time() - start_time\n", + " print(f\"\\n(Response generated in {response_time:.2f} seconds)\\n\")\n", + " \n", + " return response\n", + " except Exception as e:\n", + " print(f\"\\nError: {str(e)}\")\n", + " print(\"The model may have encountered an issue processing your request.\")\n", + " return None" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 10. Solar panels customer support with memory and ticket creation to answer questions based on contextual memory using mem0 storage and retrieval\n", + "\n", + "\n", + "Now let's demonstrate the support agent session for a user." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"🌐 Solar Panel Customer Support Agent 🌐\\n\")\n", + "print(\"This agent helps with solar panel customer support questions.\")\n", + "print(\"It will identify you by your email, access your profile from DynamoDB,\")\n", + "print(\"remember past conversations, and create JIRA tickets when needed.\\n\")\n", + "\n", + "\n", + "customer_email= \"customer5@example.com\"\n", + "\n", + "\n", + "query = \"My SunPower X panel is showing blinking red light and isn't producing any power. Create a support ticket for it.\"\n", + "\n", + "response = ask_support_question(query, customer_email)\n", + "if response:\n", + " print(f\"\\n{response}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"What's the status of my support tickets? Do I have any open tickets?\"\n", + "\n", + "response = ask_support_question(query, customer_email)\n", + "if response:\n", + " print(f\"\\n{response}\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Amazon Bedrock Guardrail Test" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"Should I spend to invest in the stock market for better returns? What stocks would you recommend for green energy investments?\"\n", + "\n", + "response = ask_support_question(query, customer_email)\n", + "if response:\n", + " print(f\"\\n{response}\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Congratulations! \n", + "You created a Customer support bot with various features using Strands Agents SDK. For the Agent builder to monitor the interactions from the Customer support bot for observability, the traces are logged in Langfuse using the environment variables you have defined. When you open your Langfuse dashboard, you can see the details on the input questions, tool use and the output in the traces, token usage, latency etc. \n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# JIRA board \n", + "\n", + "In your domain in Atlassian, under your project you will be able to see the tickets created and you can update, assign them directly in JIRA project that you created. \n", + "\n", + "![JIRA Tickets](images/JIRA.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Langfuse Dashboard\n", + "\n", + "In the Langfuse Dashboard you will be able to view the Traces in detail. The trace attributes, such as session.id, user.id, and langfuse.tags, are sent to Langfuse with the traces and help organize, filter, and analyze traces in the Langfuse UI.\n", + "\n", + "![Langfuse Traces](images/langfuse_traces.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Mem0 Dashboard\n", + "\n", + "In the mem0 Dashboard you will be able to view the conversations stored as shown in the image below.\n", + "\n", + "![Mem0 dashboard](images/mem0_memories.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 11. Example Questions for Each Integrated Component\n", + "\n", + "Below are example questions that demonstrate each of our integrated tools. You can try these or similar questions in the interactive session above." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Profile Questions\n", + "\n", + "- \"Can you tell me what solar panels I have?\"\n", + "- \"What's my purchase history?\"\n", + "- \"When did I buy my solar system?\"\n", + "- \"What are my contact preferences?\"\n", + "\n", + "### Solar System Performance Questions\n", + "\n", + "- \"How is my solar system performing?\"\n", + "- \"Can you analyze my system performance for the last month?\"\n", + "- \"What's the efficiency of my solar panels?\"\n", + "\n", + "### Warranty and Maintenance Questions\n", + "\n", + "- \"What's the warranty status on my SunPower X panels?\"\n", + "- \"How do I submit a warranty claim?\"\n", + "- \"When does my panel warranty expire?\"\n", + "- \"What's covered under my solar panel warranty?\"\n", + "\n", + "### JIRA Ticket Questions\n", + "\n", + "- \"I'm having issues with my SunPower X panels with battery drainage, can you create a ticket?\"\n", + "- \"What's the status of my support tickets?\"\n", + "- \"I need someone to come fix my solar panel, it's showing error code E5.\"\n", + "- \"Can you look up my previous support tickets?\"\n", + "\n", + "### Product Manual Questions\n", + "\n", + "- \"How do I clean my SunPower X panels?\"\n", + "- \"What's the installation process for SunPower Y panels?\"\n", + "- \"What maintenance do solar panels need?\"\n", + "- \"What are the features of the SunPower Double-X model?\"\n", + "\n", + "### Memory-Based Personalization Questions\n", + "\n", + "- \"What did we discuss last time?\"\n", + "- \"You mentioned something about panel cleaning before, what was that?\"\n", + "- \"What recommendation did you give me for improving efficiency?\"\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 12. Clean up\n", + "\n", + "When you're done with this notebook, you can clean up the resources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Clean up resources when you're done with experimenting this example\n", + "\n", + "\n", + "# Import needed utilities\n", + "import boto3\n", + "from utils.customer_dynamodb import SolarCustomerDynamoDB\n", + "from utils.knowledge_base import BedrockKnowledgeBase\n", + "import sys\n", + "sys.path.append('utils')\n", + "\n", + "print(\"πŸ“‹ Starting cleanup of resources...\")\n", + "\n", + "# Initialize resources\n", + "db = SolarCustomerDynamoDB(region_name=REGION)\n", + "bedrock_client = boto3.client('bedrock', region_name=REGION)\n", + "s3_client = boto3.client('s3', region_name=REGION)\n", + "\n", + "# 1. Delete DynamoDB table\n", + "if CUSTOMER_TABLE:\n", + " print(f\"πŸ—ƒοΈ Cleaning up DynamoDB table {CUSTOMER_TABLE}...\")\n", + " \n", + " # First verify table actually exists\n", + " if db.table_exists(CUSTOMER_TABLE):\n", + " result = db.delete_table(CUSTOMER_TABLE)\n", + " if result:\n", + " print(f\"βœ… DynamoDB table and SSM parameter cleaned up successfully\")\n", + " else:\n", + " print(f\"❌ Failed to clean up DynamoDB table\")\n", + " else:\n", + " print(f\"ℹ️ Table {CUSTOMER_TABLE} doesn't exist, cleaning up SSM parameter only\")\n", + " \n", + " # Clean up SSM parameter even if table doesn't exist\n", + " try:\n", + " ssm_client = boto3.client('ssm', region_name=REGION)\n", + " ssm_client.delete_parameter(Name='solar-customer-table-name')\n", + " print(\"βœ… Cleaned up SSM parameter\")\n", + " except Exception as e:\n", + " if 'ParameterNotFound' in str(e):\n", + " print(\"ℹ️ SSM parameter doesn't exist\")\n", + " else:\n", + " print(f\"❌ Error cleaning up SSM parameter: {e}\")\n", + "\n", + "# 2. Delete Knowledge Base and related resources\n", + "if KB_ID:\n", + " print(f\"πŸ“š Cleaning up Knowledge Base {KB_ID}...\")\n", + " # Create boto3 client to delete KB directly\n", + " bedrock_agent_client = boto3.client('bedrock-agent', region_name=REGION)\n", + "\n", + " try:\n", + " # List and delete all data sources first\n", + " data_sources =bedrock_agent_client.list_data_sources(knowledgeBaseId=KB_ID)[\"dataSourceSummaries\"]\n", + " print(f\"Found {len(data_sources)} data sources to delete\")\n", + "\n", + " for ds in data_sources:\n", + " print(f\"Deleting data source: {ds['dataSourceId']}\")\n", + " bedrock_agent_client.delete_data_source(\n", + " dataSourceId=ds[\"dataSourceId\"],\n", + " knowledgeBaseId=KB_ID\n", + " )\n", + "\n", + " # Then delete the knowledge base\n", + " print(f\"Deleting knowledge base with ID: {KB_ID}\")\n", + " bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=KB_ID)\n", + " print(f\"βœ… Successfully deleted knowledge base\")\n", + " except Exception as e:\n", + " print(f\"Error during deletion: {str(e)}\")\n", + "\n", + "# 3. Delete Guardrail\n", + "if GUARDRAIL_ID:\n", + " print(f\"πŸ›‘οΈ Cleaning up Guardrail {GUARDRAIL_ID}...\")\n", + " try:\n", + " bedrock_client.delete_guardrail(guardrailIdentifier=GUARDRAIL_ID)\n", + " print(f\"βœ… Guardrail deletion initiated. This may take several minutes to complete.\")\n", + " except Exception as e:\n", + " print(f\"❌ Error cleaning up Guardrail: {e}\")\n", + "\n", + "print(\"🧹 Cleanup process completed\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary and Conclusion\n", + "\n", + "This notebook demonstrated how to build an example of a comprehensive solar panel customer support agent that integrates multiple platforms and services:\n", + "\n", + "### Key Components Implemented\n", + "\n", + "1. **Amazon Bedrock Knowledge Base Integration**\n", + " - Provides solar panel product information, installation guides, and maintenance procedures\n", + " - Enables context-aware responses using domain-specific knowledge\n", + "\n", + "2. **Amazon Bedrock Guardrails**\n", + " - Safe and appropriate responses\n", + " - Prevents prohibited content like investment advice\n", + " - Maintains professional customer interactions\n", + "\n", + "3. **Amazon DynamoDB Customer Profiles**\n", + " - Stores and retrieves customer information\n", + " - Enables personalized support based on purchase history\n", + " - Maintains customer preferences and details\n", + "\n", + "4. **JIRA Integration**\n", + " - Creates support tickets for issues requiring follow-up\n", + " - Retrieves existing ticket information\n", + " - Provides structured tracking of customer issues\n", + "\n", + "5. **Mem0 Memory**\n", + " - Provides conversation history and context\n", + " - Enables personalized responses based on past interactions\n", + " - Improves customer experience through continuity\n", + "\n", + "6. **OpenTelemetry Observability**\n", + " - Track and monitor Strands Agent performance and interactions\n", + " - Provides insights into agent behavior and response quality\n", + " - Enables observability for continuous improvement through monitoring\n", + "\n", + "### Use Cases Demonstrated\n", + "\n", + "- Customer profile retrieval\n", + "- Warranty status checking\n", + "- Solar system performance analysis\n", + "- Product Information and support queries\n", + "- Support ticket creation and information\n", + "- Previous conversations recall\n", + "- Guardrail activation\n", + "\n", + "## Next Steps\n", + "\n", + "To extend this example you can augment the current agent's capabilities with web search to include real time information about solar panel rules and regulations into the Strands Agent.\n", + "\n", + "\n", + "\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/02-samples/11-customer-support-solar-panel/README.md b/02-samples/11-customer-support-solar-panel/README.md new file mode 100644 index 00000000..041064c3 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/README.md @@ -0,0 +1,86 @@ +# Solar Panel Customer Support System + +This example implements a comprehensive solar panel customer support agent using Amazon Bedrock, Amazon DynamoDB, Atlassian JIRA, Mem0 and Langfuse integration. The Strands Agent provides personalized customer support with conversation memory, and automated ticketing, observability for monitoring and tracking and custom tools created using [Strands Agents SDK](https://strandsagents.com/latest/) related to solar panel maintainence ! + +## Architecture Overview + +The Solar Panel Customer Support System combines several technologies to create a support solution : + +![Architecture](./images/architecture.png) + +## Prerequisites + +Python 3.10+ +Anthropic Claude 3.7 enabled on Amazon Bedrock + +| Component | Description | +|-----------|-------------| +| AWS Account | With access to Amazon Bedrock and Amazon DynamoDB | +| Mem0 API Key | For conversation memory storage (m0-*) | +| JIRA Account | For ticket creation and management | +| Langfuse Account Keys| Public Key and Secret key for observability | +| Python 3.10+ | With required packages in requirements.txt | + +## Project Structure + +- `01_solar_panel_customer_support_setup.ipynb`: Sets up required AWS resources (DynamoDB, Knowledge Base, Guardrails) +- `02_Customer_support_OTEL_mem0_JIRA.ipynb`: Implements the customer support agent with integrations +- `customer_profile_tools_dynamodb.py`: Contains tools for customer profile management +- `utils/`: Directory with utility functions for JIRA, DynamoDB, and Knowledge Base operations +- `.env.example`: Example environment variables file for JIRA integration + +## Setup Instructions + +### Step 1: Run the Setup Notebook + +Start by running the `01_solar_panel_customer_support_setup.ipynb` notebook which: + +1. Creates a DynamoDB table for customer profiles +2. Populates the table with sample customer data +3. Creates an Amazon Bedrock Knowledge Base with solar panel documentation +4. Sets up Amazon Bedrock Guardrails for content safety +5. Generates a configuration file (`solar_panel_support_config.json`) containing all resource IDs + +This configuration file will be used by the second notebook to connect to the created resources. + +### Step 2: Configure JIRA Integration + +If you want to use the JIRA ticketing system integration: + +1. Rename `.env.example` to `.env` and update it with your JIRA credentials: + ``` + JIRA_INSTANCE_URL=https://your-domain.atlassian.net/ + JIRA_USERNAME=your-email@example.com + JIRA_API_TOKEN=your-api-token + JIRA_CLOUD=True + PROJECT_NAME=SOLAR + ``` + +2. Create a project named "SOLAR" in your JIRA instance + +### Step 3: Run the Customer Support Agent Notebook + +- **The configuration file**: The first notebook creates a `solar_panel_support_config.json` file that contains all resource IDs. This file is used by the second notebook to connect to the right resources. Make sure this file exists before running the second notebook. + +Run `02_Customer_support_OTEL_mem0_JIRA.ipynb` which: + +1. Loads the configuration from `solar_panel_support_config.json` +2. Sets up integrations with Mem0 for memory, JIRA for ticketing and Langfuse for Observability +3. Creates the Strands agent with all the necessary tools +4. Tests the agent with various customer support scenarios +5. The notebook outputs are verbose to showcase the underlying steps. + +## Clean up + +- **Resource Cleanup**: At the end of the `02_Customer_support_OTEL_mem0_JIRA.ipynb` notebook, please run the cleanup section that deletes created Amazon resources. + + +## Features + +- Customer profile management with Amazon DynamoDB +- Custom tools created such as Solar system performance analysis, Warranty status checking +- JIRA ticket creation and management +- Conversation memory and personalisation with Mem0 +- Amazon Knowledge base integration for product information +- OpenTelemetry observability for a Strands Agent with Langfuse + diff --git a/02-samples/11-customer-support-solar-panel/customer_profile_tools_dynamodb.py b/02-samples/11-customer-support-solar-panel/customer_profile_tools_dynamodb.py new file mode 100644 index 00000000..57d5e119 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/customer_profile_tools_dynamodb.py @@ -0,0 +1,272 @@ +from strands import tool +from typing import Dict, Optional, List +import datetime +import os +import boto3 +from utils.customer_dynamodb import SolarCustomerDynamoDB + +# Initialize the DynamoDB customer profile manager +db = SolarCustomerDynamoDB() + +# Try to get table name from SSM parameter store +table_name = db.get_table_name_from_ssm() +if not table_name: + # Default table name if not found in parameter store + table_name = "SolarCustomerProfiles" + print(f"Warning: Table name not found in parameter store, using default: {table_name}") + +@tool +def get_customer_profile(customer_id: str = None, email: str = None) -> Dict: + """ + Get customer profile information by customer ID or email from DynamoDB. + + Args: + customer_id (str, optional): The customer ID to lookup + email (str, optional): The customer email to lookup + + Returns: + dict: Customer profile information or error message + """ + if not customer_id and not email: + return {"status": "error", "message": "Either customer_id or email must be provided"} + + profile = None + if customer_id: + profile = db.get_profile_by_id(customer_id, table_name) + elif email: + profile = db.get_profile_by_email(email, table_name) + + if not profile: + return {"status": "error", "message": "Customer profile not found"} + + return profile + + +@tool +def update_customer_profile(customer_id: str, updates: Dict) -> Dict: + """ + Update customer profile information in DynamoDB. + + Args: + customer_id (str): The customer ID to update + updates (dict): The updates to apply to the profile + + Returns: + dict: Updated customer profile or error message + """ + updated_profile = db.update_profile(customer_id, updates, table_name) + if not updated_profile: + return {"status": "error", "message": "Customer profile not found or update failed"} + + return updated_profile + + +@tool +def analyze_solar_system_performance(customer_id: str = None, email: str = None, time_period: str = "month") -> Dict: + """ + Analyze a customer's solar system performance based on their installed products. + + Args: + customer_id (str, optional): The customer ID to lookup + email (str, optional): The customer email to lookup + time_period (str): Period for analysis ('month', 'quarter', 'year') + + Returns: + dict: Performance analysis results or error message + """ + if not customer_id and not email: + return {"status": "error", "message": "Either customer_id or email must be provided"} + + profile = None + if customer_id: + profile = db.get_profile_by_id(customer_id, table_name) + elif email: + profile = db.get_profile_by_email(email, table_name) + + if not profile: + return {"status": "error", "message": "Customer profile not found"} + + # Simulate performance analysis based on purchased products + purchase_history = profile.get('purchase_history', []) + panel_purchases = [p for p in purchase_history if p.get("product_type") == "panel"] + + if not panel_purchases: + return {"status": "error", "message": "No solar panels found in customer purchase history"} + + # Simple simulated performance analysis + total_capacity = 0 + panel_models = [] + for purchase in panel_purchases: + # Estimate capacity based on product name + model_name = purchase.get("product_name", "") + quantity = purchase.get("quantity", 1) + + # Map panel models to estimated wattage + wattage = 0 + if "SunPower X" in model_name: + wattage = 320 + elif "SunPower Y" in model_name: + wattage = 290 + elif "SunPower Double-X" in model_name: + wattage = 400 + else: + wattage = 300 # default estimate + + total_capacity += wattage * quantity + panel_models.append({"model": model_name, "quantity": quantity, "wattage": wattage}) + + # Simulate performance data + today = datetime.datetime.now() + + # Performance depends on time period + if time_period == "month": + days = 30 + efficiency = 0.92 # 92% of expected performance + elif time_period == "quarter": + days = 90 + efficiency = 0.89 # 89% of expected performance + else: # year + days = 365 + efficiency = 0.87 # 87% of expected performance + + # Simulate daily production (simplified calculation) + avg_daily_sun_hours = 5.5 + expected_daily_kwh = total_capacity * avg_daily_sun_hours / 1000 + actual_daily_kwh = expected_daily_kwh * efficiency + + # Calculate period totals + expected_kwh = expected_daily_kwh * days + actual_kwh = actual_daily_kwh * days + + # Return analysis results + return { + "status": "success", + "customer_name": profile.get('name'), + "system_details": { + "total_capacity_watts": total_capacity, + "panel_models": panel_models + }, + "performance_analysis": { + "time_period": time_period, + "expected_kwh_production": round(expected_kwh, 2), + "actual_kwh_production": round(actual_kwh, 2), + "performance_ratio": round(efficiency * 100, 1), + "avg_daily_production_kwh": round(actual_daily_kwh, 2), + }, + "recommendations": generate_recommendations(efficiency) + } + + +@tool +def check_warranty_status(customer_id: str = None, email: str = None, product_name: str = None) -> Dict: + """ + Check warranty status for customer products and provide claim information. + + Args: + customer_id (str, optional): The customer ID to lookup + email (str, optional): The customer email to lookup + product_name (str, optional): Specific product to check warranty for + + Returns: + dict: Warranty information or error message + """ + if not customer_id and not email: + return {"status": "error", "message": "Either customer_id or email must be provided"} + + profile = None + if customer_id: + profile = db.get_profile_by_id(customer_id, table_name) + elif email: + profile = db.get_profile_by_email(email, table_name) + + if not profile: + return {"status": "error", "message": "Customer profile not found"} + + # Get all products or filter by specific product + purchase_history = profile.get('purchase_history', []) + products = purchase_history + if product_name: + products = [p for p in products if product_name.lower() in p.get("product_name", "").lower()] + + if not products: + return {"status": "error", "message": "No matching products found in purchase history"} + + # Generate warranty information for each product + warranty_info = [] + for product in products: + product_name = product.get("product_name", "Unknown Product") + purchase_date_str = product.get("purchase_date", "") + + try: + purchase_date = datetime.datetime.fromisoformat(purchase_date_str) + today = datetime.datetime.now() + + # Determine warranty length based on product type + warranty_years = 0 + if "SunPower X" in product_name: + warranty_years = 25 + elif "SunPower Y" in product_name: + warranty_years = 20 + elif "SunPower Double-X" in product_name: + warranty_years = 30 + elif "inverter" in product.get("product_type", "").lower(): + warranty_years = 10 + else: + warranty_years = 5 # default warranty + + warranty_end_date = purchase_date.replace(year=purchase_date.year + warranty_years) + days_remaining = (warranty_end_date - today).days + warranty_active = days_remaining > 0 + + warranty_info.append({ + "product_name": product_name, + "purchase_date": purchase_date_str, + "warranty_length_years": warranty_years, + "warranty_end_date": warranty_end_date.isoformat(), + "warranty_active": warranty_active, + "days_remaining": max(0, days_remaining), + "warranty_percentage_remaining": round(max(0, days_remaining) / (warranty_years * 365) * 100, 1), + "claim_process": get_claim_process(product_name) if warranty_active else "Warranty expired" + }) + + except (ValueError, TypeError): + warranty_info.append({ + "product_name": product_name, + "error": "Could not determine warranty status due to invalid purchase date" + }) + + return { + "status": "success", + "customer_name": profile.get('name'), + "warranty_information": warranty_info + } + + +def generate_recommendations(efficiency_ratio): + """Helper function to generate performance recommendations based on efficiency ratio""" + if efficiency_ratio >= 0.95: + return [ + "Your system is performing excellently. Continue with standard maintenance." + ] + elif efficiency_ratio >= 0.85: + return [ + "Your system is performing adequately but could be improved.", + "Consider cleaning panels to remove potential debris or dust buildup.", + "Check for any new shade sources that may have developed near panels." + ] + else: + return [ + "Your system is performing below expectations.", + "We recommend scheduling a professional inspection to identify issues.", + "Check for inverter error codes or warning lights.", + "Ensure all panels are clean and free from debris or shading.", + "Monitor performance daily to identify any patterns in reduced output." + ] + + +def get_claim_process(product_name): + """Helper function to return warranty claim process for a product""" + if "SunPower" in product_name: + return "Submit claim through SunPower warranty portal with serial number. Customer support will arrange inspection within 5-7 business days." + else: + return "Contact customer support with product serial number to initiate warranty claim process." \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/data/solar-panel-instructions-mannual.txt b/02-samples/11-customer-support-solar-panel/data/solar-panel-instructions-mannual.txt new file mode 100644 index 00000000..d0d0f3a0 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/data/solar-panel-instructions-mannual.txt @@ -0,0 +1,84 @@ +# Home Solar Panel Installation Guide + +## SunPower X Model + +**Description:** The SunPower X is a mid-range residential solar panel with excellent efficiency for its price point. It features a sleek black design with minimal grid lines, making it aesthetically pleasing for most home installations. + +**Features:** +- 19.8% efficiency rating +- 320-watt output per panel +- 25-year product and performance warranty +- Integrated microinverter system +- Weather-resistant frame + +### Installation Steps for SunPower X: +1. Conduct a site assessment to determine optimal placement and sun exposure +2. Secure necessary permits from local building department +3. Purchase mounting hardware compatible with your roof type +4. Install mounting rails according to manufacturer specifications +5. Attach the microinverters to the mounting rails +6. Connect the microinverters to your home's electrical system +7. Mount the SunPower X panels onto the rails +8. Connect the panels to the microinverters +9. Install monitoring system to track performance +10. Schedule inspection with local building department for final approval + +## SunPower Y Model + +**Description:** The SunPower Y is an entry-level solar panel designed for budget-conscious homeowners. Despite its lower price point, it offers reliable performance and durability. + +**Features:** +- 17.5% efficiency rating +- 290-watt output per panel +- 20-year product warranty +- String inverter compatible +- Enhanced durability in harsh weather conditions + +### Installation Steps for SunPower Y: +1. Complete a roof structural assessment +2. Obtain required permits and utility interconnection agreement +3. Install roof attachments according to local building codes +4. Mount racking system to roof attachments +5. Install string inverter near main electrical panel +6. Place and secure SunPower Y panels onto the racking system +7. Wire panels together in series to create strings +8. Connect strings to the inverter +9. Connect inverter to home's electrical system +10. Install monitoring equipment +11. Schedule final inspection with local authorities + +## SunPower Double-X Model + +**Description:** The SunPower Double-X is a premium bifacial solar panel that captures sunlight from both sides, maximizing energy production. It's ideal for installations with reflective surfaces below or for ground-mounted systems. + +**Features:** +- 22.3% front-side efficiency, 8% additional from rear-side capture +- 400-watt output per panel +- 30-year comprehensive warranty +- Dual-glass construction +- Advanced temperature coefficient for better performance in heat + +### Installation Steps for SunPower Double-X: +1. Perform comprehensive site analysis including reflection potential +2. Secure enhanced structural permits due to additional weight +3. Install specialized elevated mounting system to allow light reflection +4. Connect power optimizers to each panel location +5. Mount the Double-X panels with proper spacing for airflow +6. Install central inverter system +7. Connect panels to power optimizers +8. Wire power optimizers to central inverter +9. Configure monitoring system for bifacial performance tracking +10. Connect system to home electrical panel through appropriate disconnect switches +11. Schedule specialized bifacial system inspection + +## Compliance with Energy Rules: +1. Research local, state, and federal regulations before installation +2. Ensure your installer is certified by the North American Board of Certified Energy Practitioners (NABCEP) +3. Obtain all required building and electrical permits +4. Follow National Electrical Code (NEC) requirements for solar installations +5. Contact your utility company about interconnection requirements +6. Install required rapid shutdown systems for firefighter safety +7. Maintain proper setbacks from roof edges as required by local code +8. Schedule all required inspections during and after installation +9. Register your system with local utility for net metering benefits +10. Keep documentation of all permits and inspections for tax credit purposes \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/data/solar-panel-maintenance-mannual.txt b/02-samples/11-customer-support-solar-panel/data/solar-panel-maintenance-mannual.txt new file mode 100644 index 00000000..f63e192e --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/data/solar-panel-maintenance-mannual.txt @@ -0,0 +1,49 @@ +# Maintenance Guide for SunPower Solar Panel Models + +## SunPower X Model Maintenance +1. Inspect panels visually every 3 months for dirt, debris, or damage +2. Clean panels quarterly using distilled water and a soft brush on a telescoping pole +3. Avoid using harsh chemicals or abrasive materials when cleaning +4. Check microinverter indicator lights monthly to ensure proper operation +5. Monitor system performance through the provided app or web portal +6. Clear any vegetation that may begin to shade the panels +7. After severe weather, perform additional visual inspections for damage +8. Verify mounting hardware tightness annually +9. Check electrical connections for corrosion once per year +10. Schedule professional inspection every 2-3 years to maintain warranty validity + +## SunPower Y Model Maintenance +1. Visually inspect panels every 3-4 months for accumulated debris +2. Clean panels with soft cloth and mild soap solution twice yearly +3. Check string inverter display regularly for error codes or warnings +4. Clear leaves and debris from around the string inverter to maintain airflow +5. Compare energy production reports monthly to identify potential issues +6. Inspect roof attachments and racking system annually for security +7. Test monitoring equipment quarterly to ensure accurate reporting +8. Trim trees or vegetation that may cast shadows on panels +9. After storms, check for physical damage or loose components +10. Have an electrician inspect wiring connections annually + +## SunPower Double-X Model Maintenance +1. Clean both front and back surfaces of panels quarterly +2. Ensure the area beneath elevated panels remains reflective and clear of debris +3. Check specialized mounting system for stability every 6 months +4. Monitor bifacial performance metrics to ensure rear-side generation is optimal +5. Inspect power optimizers annually for proper operation +6. Maintain proper clearance around central inverter for cooling +7. Clean or replace air filters on the central inverter system as needed +8. Verify dual-glass construction for any cracks or moisture intrusion twice yearly +9. Test disconnect switches annually to ensure proper emergency operation +10. Schedule professional bifacial system inspection annually to maintain extended warranty + +## Ensuring Compliance with Energy Rules +1. Keep all installation permits, inspection certificates, and warranty documents in a dedicated file +2. Review utility interconnection agreement annually for any policy changes +3. Maintain NABCEP certification records of your installer for warranty claims +4. Schedule required reinspections as mandated by local building codes +5. Update monitoring system firmware when available to maintain compliance +6. Register any system modifications with your utility company +7. Maintain rapid shutdown system functionality through annual testing +8. Keep records of all maintenance activities for warranty and insurance purposes +9. Review net metering credits monthly to ensure proper utility accounting +10. Stay informed about changes to local solar regulations that may affect your system \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/images/JIRA.png b/02-samples/11-customer-support-solar-panel/images/JIRA.png new file mode 100644 index 00000000..f917cb7c Binary files /dev/null and b/02-samples/11-customer-support-solar-panel/images/JIRA.png differ diff --git a/02-samples/11-customer-support-solar-panel/images/architecture.png b/02-samples/11-customer-support-solar-panel/images/architecture.png new file mode 100644 index 00000000..773a757b Binary files /dev/null and b/02-samples/11-customer-support-solar-panel/images/architecture.png differ diff --git a/02-samples/11-customer-support-solar-panel/images/langfuse_traces.png b/02-samples/11-customer-support-solar-panel/images/langfuse_traces.png new file mode 100644 index 00000000..fdf4b89b Binary files /dev/null and b/02-samples/11-customer-support-solar-panel/images/langfuse_traces.png differ diff --git a/02-samples/11-customer-support-solar-panel/images/mem0_memories.png b/02-samples/11-customer-support-solar-panel/images/mem0_memories.png new file mode 100644 index 00000000..a0f4c10a Binary files /dev/null and b/02-samples/11-customer-support-solar-panel/images/mem0_memories.png differ diff --git a/02-samples/11-customer-support-solar-panel/requirements.txt b/02-samples/11-customer-support-solar-panel/requirements.txt new file mode 100644 index 00000000..4a395a8b --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/requirements.txt @@ -0,0 +1,15 @@ +boto3 +botocore +awscli +opensearch-py +requests-aws4auth +pyyaml +retrying +pandas +strands-agents +strands-agents-tools +mem0ai +python-dotenv +mem0ai +langfuse +atlassian-python-api \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/utils/customer_dynamodb.py b/02-samples/11-customer-support-solar-panel/utils/customer_dynamodb.py new file mode 100644 index 00000000..17906982 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/utils/customer_dynamodb.py @@ -0,0 +1,379 @@ +import boto3 +import os +from boto3.session import Session +from datetime import datetime +import sys + +# Add parent directory to path so we can import from parent directory +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) + +class SolarCustomerDynamoDB: + """ + Support class for managing customer profiles in DynamoDB. + Provides functions to create, delete, and populate the customer table. + """ + + def __init__(self, region_name=None): + """ + Initialize the DynamoDB manager with optional AWS region + + Args: + region_name: Optional AWS region name + """ + self._boto_session = Session() + self._region = region_name or self._boto_session.region_name + self._dynamodb_client = boto3.client("dynamodb", region_name=self._region) + self._dynamodb_resource = boto3.resource("dynamodb", region_name=self._region) + self._ssm_client = boto3.client('ssm', region_name=self._region) + + # Default table name - can be overridden in function calls + self.table_name = 'SolarCustomerProfiles' + + def create_table(self, table_name=None): + """ + Create a DynamoDB table for customer profiles + + Args: + table_name: Optional table name (defaults to SolarCustomerProfiles) + + Returns: + DynamoDB Table object + """ + table_name = table_name or self.table_name + + try: + table = self._dynamodb_resource.create_table( + TableName=table_name, + KeySchema=[ + {"AttributeName": "customer_id", "KeyType": "HASH"}, # Partition key + ], + AttributeDefinitions=[ + {"AttributeName": "customer_id", "AttributeType": "S"}, # customer_id + {"AttributeName": "email", "AttributeType": "S"}, # email + ], + GlobalSecondaryIndexes=[ + { + 'IndexName': 'EmailIndex', + 'KeySchema': [ + {'AttributeName': "email", 'KeyType': 'HASH'}, + ], + 'Projection': { + 'ProjectionType': 'ALL' + } + }, + ], + BillingMode="PAY_PER_REQUEST", # Use on-demand capacity mode + ) + + # Wait for the table to be created + print(f'Creating table {table_name}...') + table.wait_until_exists() + print(f'Table {table_name} created successfully!') + + # Store table name in parameter store for easy retrieval + self._ssm_client.put_parameter( + Name=f'solar-customer-table-name', + Description=f'Solar customer profile table name', + Value=table_name, + Type='String', + Overwrite=True + ) + return table + except self._dynamodb_client.exceptions.ResourceInUseException: + print(f"Table {table_name} already exists, retrieving it") + # Update parameter store anyway to ensure it's current + self._ssm_client.put_parameter( + Name=f'solar-customer-table-name', + Description=f'Solar customer profile table name', + Value=table_name, + Type='String', + Overwrite=True + ) + return self._dynamodb_resource.Table(table_name) + + def delete_table(self, table_name=None): + """ + Delete the DynamoDB customer profiles table and clean up SSM parameter + + Args: + table_name: Optional table name (defaults to SolarCustomerProfiles) + + Returns: + True if successful, False otherwise + """ + table_name = table_name or self.table_name + cleaned_up_ssm = False + cleaned_up_table = False + + # First check if the table exists before trying to delete + if not self.table_exists(table_name): + print(f"Table {table_name} does not exist, no need to delete.") + cleaned_up_table = True + else: + try: + self._dynamodb_client.delete_table(TableName=table_name) + print(f"Table {table_name} is being deleted...") + waiter = self._dynamodb_client.get_waiter('table_not_exists') + waiter.wait(TableName=table_name) + print(f"Table {table_name} has been deleted.") + cleaned_up_table = True + except Exception as e: + print(f"Error deleting table {table_name}: {e}") + + # Always try to clean up SSM parameter regardless of table existence + try: + # Check if the SSM parameter exists and if it points to the table we're deleting + ssm_table = self.get_table_name_from_ssm() + if ssm_table and (ssm_table == table_name or table_name is None): + self._ssm_client.delete_parameter(Name='solar-customer-table-name') + print(f"SSM parameter 'solar-customer-table-name' deleted.") + cleaned_up_ssm = True + except self._ssm_client.exceptions.ParameterNotFound: + print(f"SSM parameter 'solar-customer-table-name' not found.") + cleaned_up_ssm = True + except Exception as e: + print(f"Error cleaning up SSM parameter: {e}") + + return cleaned_up_table and cleaned_up_ssm + + def generate_synthetic_profiles(self, count=10, table_name=None): + """ + Generate synthetic customer profiles directly in DynamoDB + + Args: + count: Number of profiles to generate (default: 10) + table_name: Optional table name (defaults to SolarCustomerProfiles) + + Returns: + List of customer IDs created + """ + table_name = table_name or self.table_name + table = self._dynamodb_resource.Table(table_name) + + countries = ["USA", "Canada", "Australia", "UK", "Germany"] + states = { + "USA": ["California", "Texas", "New York", "Florida", "Washington"], + "Canada": ["Ontario", "Quebec", "British Columbia", "Alberta"], + "Australia": ["New South Wales", "Victoria", "Queensland"], + "UK": ["England", "Scotland", "Wales"], + "Germany": ["Bavaria", "Berlin", "Hesse"] + } + products = [ + {"name": "SunPower X", "price": 1200, "type": "panel"}, + {"name": "SunPower Y", "price": 800, "type": "panel"}, + {"name": "SunPower Double-X", "price": 1600, "type": "panel"}, + {"name": "PowerWall Battery", "price": 5000, "type": "battery"}, + {"name": "SolarInverter X1", "price": 1500, "type": "inverter"}, + {"name": "EcoCharge Controller", "price": 300, "type": "controller"} + ] + # No ticket types needed - tickets are managed in JIRA only + + created_profiles = [] + + for i in range(count): + customer_id = f"CUST{100+i}" + name = f"Customer {i+1}" + email = f"customer{i+1}@example.com" + country = countries[i % len(countries)] + state = states[country][i % len(states[country])] + + # Generate purchase history + purchase_count = (i % 3) + 1 # 1-3 purchases + purchases = [] + for j in range(purchase_count): + product = products[(i+j) % len(products)] + purchase_date = datetime.now().replace( + month=((i+j) % 12) + 1, + day=((i*j) % 28) + 1 + ).isoformat() + + purchases.append({ + "purchase_id": f"PUR{100+i}{j}", + "product_name": product["name"], + "product_type": product["type"], + "price": product["price"], + "quantity": (j % 2) + 1, + "purchase_date": purchase_date + }) + + # Support tickets removed - now managed in JIRA only + + # Generate preferences + preferences = { + "contact_preference": "email" if i % 2 == 0 else "phone", + "newsletter": i % 3 == 0, + "maintenance_reminder": i % 2 == 0 + } + + # Create profile directly in DynamoDB + profile_data = { + "customer_id": customer_id, + "name": name, + "email": email, + "country": country, + "state": state, + "purchase_history": purchases, + "preferences": preferences, + "created_at": datetime.now().isoformat(), + "updated_at": datetime.now().isoformat() + } + + # Store in DynamoDB + table.put_item(Item=profile_data) + created_profiles.append(customer_id) + + print(f"Generated and stored {len(created_profiles)} synthetic customer profiles") + return created_profiles + + def get_profile_by_id(self, customer_id, table_name=None): + """ + Retrieve a customer profile by customer ID + + Args: + customer_id: The customer ID to look up + table_name: Optional table name (defaults to SolarCustomerProfiles) + + Returns: + Customer profile dict or None if not found + """ + table_name = table_name or self.table_name + table = self._dynamodb_resource.Table(table_name) + + try: + response = table.get_item(Key={'customer_id': customer_id}) + return response.get('Item') + except Exception as e: + print(f"Error retrieving customer profile: {e}") + return None + + def get_profile_by_email(self, email, table_name=None): + """ + Retrieve a customer profile by email + + Args: + email: The customer email to look up + table_name: Optional table name (defaults to SolarCustomerProfiles) + + Returns: + Customer profile dict or None if not found + """ + table_name = table_name or self.table_name + table = self._dynamodb_resource.Table(table_name) + + try: + response = table.query( + IndexName='EmailIndex', + KeyConditionExpression=boto3.dynamodb.conditions.Key('email').eq(email) + ) + if response.get('Items'): + return response['Items'][0] + return None + except Exception as e: + print(f"Error retrieving customer profile by email: {e}") + return None + + def update_profile(self, customer_id, updates, table_name=None): + """ + Update a customer profile with new information + + Args: + customer_id: The customer ID to update + updates: Dictionary containing fields to update + table_name: Optional table name (defaults to SolarCustomerProfiles) + + Returns: + Updated profile dict or None if failed + """ + table_name = table_name or self.table_name + table = self._dynamodb_resource.Table(table_name) + + try: + # Check if customer exists + profile = self.get_profile_by_id(customer_id, table_name) + if not profile: + print(f"Customer with ID {customer_id} not found") + return None + + # Add updated timestamp + updates['updated_at'] = datetime.now().isoformat() + + # Prepare update expression + update_expression = "SET " + expression_attr_values = {} + expression_attr_names = {} + + for key, value in updates.items(): + update_expression += f"#{key} = :{key}, " + expression_attr_values[f":{key}"] = value + expression_attr_names[f"#{key}"] = key + + # Remove trailing comma and space + update_expression = update_expression[:-2] + + # Update the profile + response = table.update_item( + Key={'customer_id': customer_id}, + UpdateExpression=update_expression, + ExpressionAttributeNames=expression_attr_names, + ExpressionAttributeValues=expression_attr_values, + ReturnValues="ALL_NEW" + ) + + return response.get('Attributes') + except Exception as e: + print(f"Error updating customer profile: {e}") + return None + + def get_table_name_from_ssm(self): + """ + Retrieves the customer profile table name from SSM Parameter Store + + Returns: + Table name or None if parameter not found + """ + try: + response = self._ssm_client.get_parameter(Name='solar-customer-table-name') + return response['Parameter']['Value'] + except self._ssm_client.exceptions.ParameterNotFound: + return None + + def table_exists(self, table_name): + """ + Checks if the DynamoDB table actually exists + + Args: + table_name: The table name to check + + Returns: + True if table exists, False otherwise + """ + try: + self._dynamodb_client.describe_table(TableName=table_name) + return True + except self._dynamodb_client.exceptions.ResourceNotFoundException: + return False + + +# Example usage (can be directly called from notebooks): +if __name__ == "__main__": + # This code will only run when the module is executed directly + # Useful for testing + + db = SolarCustomerDynamoDB() + + # Create table example + table = db.create_table() + + # Generate profiles example + db.generate_synthetic_profiles(count=5) + + # Get profile example + profile = db.get_profile_by_id("CUST100") + if profile: + print(f"Found customer: {profile['name']}") + + # Update profile example + updated = db.update_profile("CUST100", {"name": "Updated Customer Name"}) + if updated: + print(f"Updated customer name: {updated['name']}") \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/utils/jira_tools.py b/02-samples/11-customer-support-solar-panel/utils/jira_tools.py new file mode 100644 index 00000000..14f7eb91 --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/utils/jira_tools.py @@ -0,0 +1,161 @@ +# Consider this for Atlassian MCP server: https://github.com/sooperset/mcp-atlassian + +import os +from typing import Literal, Dict + +from atlassian import Jira +from dotenv import load_dotenv +from strands import tool + +# Load environment variables from .env file +load_dotenv() + +# Initialize constants +PROJECT_NAME = os.getenv("PROJECT_NAME", "SOLAR") +JIRA_URL = os.getenv("JIRA_INSTANCE_URL", "") +JIRA_USERNAME = os.getenv("JIRA_USERNAME") +JIRA_API_TOKEN = os.getenv("JIRA_API_TOKEN") +JIRA_CLOUD = os.getenv("JIRA_CLOUD", "false").lower() == "true" + +# Initialize Jira client if credentials are available +jira = None +PROJECT_KEY = None + +if JIRA_URL and JIRA_USERNAME and JIRA_API_TOKEN: + try: + jira = Jira( + url=JIRA_URL, + username=JIRA_USERNAME, + password=JIRA_API_TOKEN, + cloud=JIRA_CLOUD, + ) + + def get_project_key(project_name: str) -> str: + """Retrieve the Jira project key for a given project name.""" + projects = jira.projects(expand=None) + if not projects: + raise RuntimeError("No projects found in Jira instance.") + + project_dict = {project["name"]: project["key"] for project in projects} + if project_name not in project_dict: + raise ValueError(f"Project '{project_name}' not found in Jira.") + + return project_dict[project_name] + + # Attempt to get project key on module load + try: + PROJECT_KEY = get_project_key(PROJECT_NAME) + except Exception as e: + print(f"Warning: Failed to initialize project key: {e}") + except Exception as e: + print(f"Warning: Failed to initialize Jira client: {e}") +else: + print("Warning: Jira integration disabled due to missing credentials") + + +@tool +def create_solar_support_ticket( + customer_id: str, + title: str, + description: str, + ticket_type: Literal["Installation", "Maintenance", "Performance", "Billing", "Technical"], + customer_email: str = None # Add email for better customer linking +) -> Dict: + """ + Create a solar panel support ticket in Jira. + + Args: + customer_id (str): ID of the customer with the issue + title (str): One-line summary of the support issue + description (str): Detailed description of the issue + ticket_type (Literal): Type of solar panel issue + + Returns: + dict: Information about the created ticket or error message + """ + if not jira or not PROJECT_KEY: + return { + "status": "error", + "message": "Jira integration is not properly configured" + } + + # Add customer information to description + customer_info = f"Customer ID: {customer_id}\n" + if customer_email: + customer_info += f"Customer Email: {customer_email}\n" + full_description = f"{customer_info}\n{description}" + + # Map our ticket types to Jira issue types + # Assuming Jira has "Task" issue type for all solar support tickets + issue_type = "Task" + + issue_payload = { + "project": {"key": PROJECT_KEY}, + "summary": title, + "description": full_description, + "issuetype": {"name": issue_type}, + "labels": [ticket_type, f"customer-{customer_id}"], # Add customer ID as label for easier searching + } + + try: + response = jira.issue_create_or_update(issue_payload) + ticket_key = response.get('key', 'Unknown') + return { + "status": "success", + "message": f"{ticket_type} support ticket created successfully", + "ticket_key": ticket_key + } + except Exception as e: + return { + "status": "error", + "message": f"Failed to create Jira ticket: {e}" + } + + +@tool +def get_customer_tickets(customer_id: str) -> Dict: + """ + Get all support tickets for a specific customer. + + Args: + customer_id (str): ID of the customer to look up + + Returns: + dict: Information about the customer's tickets or error message + """ + if not jira or not PROJECT_KEY: + return { + "status": "error", + "message": "Jira integration is not properly configured" + } + + try: + # Search for customer ID in ticket description + # Note: In a real system, you might have a custom field for customer ID + jql_query = f'project = {PROJECT_KEY} AND description ~ "Customer ID: {customer_id}"' + + response = jira.jql(jql_query) + issues = response.get('issues', []) + + tickets = [] + for issue in issues: + ticket = { + "key": issue.get('key'), + "summary": issue.get('fields', {}).get('summary'), + "status": issue.get('fields', {}).get('status', {}).get('name'), + "type": issue.get('fields', {}).get('issuetype', {}).get('name'), + "created": issue.get('fields', {}).get('created'), + "updated": issue.get('fields', {}).get('updated') + } + tickets.append(ticket) + + return { + "status": "success", + "count": len(tickets), + "tickets": tickets + } + except Exception as e: + return { + "status": "error", + "message": f"Failed to retrieve customer tickets: {e}" + } \ No newline at end of file diff --git a/02-samples/11-customer-support-solar-panel/utils/knowledge_base.py b/02-samples/11-customer-support-solar-panel/utils/knowledge_base.py new file mode 100644 index 00000000..96d7555a --- /dev/null +++ b/02-samples/11-customer-support-solar-panel/utils/knowledge_base.py @@ -0,0 +1,1305 @@ +import json +import boto3 +import time +from botocore.exceptions import ClientError +from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError +import pprint +from retrying import retry +import zipfile +from io import BytesIO +import warnings +import random +warnings.filterwarnings('ignore') + +valid_generation_models = ["anthropic.claude-3-5-sonnet-20240620-v1:0", + "anthropic.claude-3-5-haiku-20241022-v1:0", + "anthropic.claude-3-sonnet-20240229-v1:0", + "anthropic.claude-3-haiku-20240307-v1:0", + "amazon.nova-micro-v1:0"] + +valid_reranking_models = ["cohere.rerank-v3-5:0", + "amazon.rerank-v1:0"] + +valid_embedding_models = ["cohere.embed-multilingual-v3", + "cohere.embed-english-v3", + "amazon.titan-embed-text-v1", + "amazon.titan-embed-text-v2:0"] + +embedding_context_dimensions = { + "cohere.embed-multilingual-v3": 512, + "cohere.embed-english-v3": 512, + "amazon.titan-embed-text-v1": 1536, + "amazon.titan-embed-text-v2:0": 1024 +} + +pp = pprint.PrettyPrinter(indent=2) + +def interactive_sleep(seconds: int): + dots = '' + for i in range(seconds): + dots += '.' + print(dots, end='\r') + time.sleep(1) + +class BedrockKnowledgeBase: + """ + Support class that allows for: + - creation (or retrieval) of a Knowledge Base for Amazon Bedrock with all its pre-requisites + (including OSS, IAM roles and Permissions and S3 bucket) + - Ingestion of data into the Knowledge Base + - Deletion of all resources created + """ + def __init__( + self, + kb_name=None, + kb_description=None, + data_sources=None, + multi_modal=None, + parser=None, + intermediate_bucket_name=None, + lambda_function_name=None, + embedding_model="amazon.titan-embed-text-v2:0", + generation_model="anthropic.claude-3-sonnet-20240229-v1:0", + reranking_model="cohere.rerank-v3-5:0", + graph_model="anthropic.claude-3-haiku-20240307-v1:0", + chunking_strategy="FIXED_SIZE", + suffix=None, + vector_store="OPENSEARCH_SERVERLESS" # can be OPENSEARCH_SERVERLESS or NEPTUNE_ANALYTICS + ): + """ + Class initializer + Args: + kb_name(str): The name of the Knowledge Base. + kb_description(str): The description of the Knowledge Base. + data_sources(list): The list of data source used for the Knowledge Base. + multi_modal(bool): Whether the Knowledge Base supports multi-modal data. + parser(str): The parser to be used for the Knowledge Base. + intermediate_bucket_name(str): The name of the intermediate S3 bucket to be used for custom chunking strategy. + lambda_function_name(str): The name of the Lambda function to be used for custom chunking strategy. + embedding_model(str): The embedding model to be used for the Knowledge Base. + generation_model(str): The generation model to be used for the Knowledge Base. + reranking_model(str): The reranking model to be used for the Knowledge Base. + chunking_strategy(str): The chunking strategy to be used for the Knowledge Base. + suffix(str): A suffix to be used for naming resources. + """ + + boto3_session = boto3.session.Session() + self.region_name = boto3_session.region_name + self.iam_client = boto3_session.client('iam') + self.lambda_client = boto3.client('lambda') + self.account_number = boto3.client('sts').get_caller_identity().get('Account') + self.suffix = suffix or f'{self.region_name}-{self.account_number}' + self.identity = boto3.client('sts').get_caller_identity()['Arn'] + self.aoss_client = boto3_session.client('opensearchserverless') + self.neptune_client = boto3.client('neptune-graph') + self.s3_client = boto3.client('s3') + self.bedrock_agent_client = boto3.client('bedrock-agent') + credentials = boto3.Session().get_credentials() + self.awsauth = AWSV4SignerAuth(credentials, self.region_name, 'aoss') + + self.kb_name = kb_name or f"default-knowledge-base-{self.suffix}" + self.vector_store = vector_store + self.graph_name = self.kb_name + self.kb_description = kb_description or "Default Knowledge Base" + + self.data_sources = data_sources + self.bucket_names=[d["bucket_name"] for d in self.data_sources if d['type']== 'S3'] + self.secrets_arns = [d["credentialsSecretArn"] for d in self.data_sources if d['type']== 'CONFLUENCE'or d['type']=='SHAREPOINT' or d['type']=='SALESFORCE'] + self.chunking_strategy = chunking_strategy + self.multi_modal = multi_modal + self.parser = parser + + if multi_modal or chunking_strategy == "CUSTOM" : + self.intermediate_bucket_name = intermediate_bucket_name or f"{self.kb_name}-intermediate-{self.suffix}" + self.lambda_function_name = lambda_function_name or f"{self.kb_name}-lambda-{self.suffix}" + else: + self.intermediate_bucket_name = None + self.lambda_function_name = None + + self.embedding_model = embedding_model + self.generation_model = generation_model + self.reranking_model = reranking_model + self.graph_model = graph_model + + self._validate_models() + + self.encryption_policy_name = f"bedrock-sample-rag-sp-{self.suffix}" + self.network_policy_name = f"bedrock-sample-rag-np-{self.suffix}" + self.access_policy_name = f'bedrock-sample-rag-ap-{self.suffix}' + self.kb_execution_role_name = f'AmazonBedrockExecutionRoleForKnowledgeBase_{self.suffix}' + self.fm_policy_name = f'AmazonBedrockFoundationModelPolicyForKnowledgeBase_{self.suffix}' + self.s3_policy_name = f'AmazonBedrockS3PolicyForKnowledgeBase_{self.suffix}' + self.sm_policy_name = f'AmazonBedrockSecretPolicyForKnowledgeBase_{self.suffix}' + self.cw_log_policy_name = f'AmazonBedrockCloudWatchPolicyForKnowledgeBase_{self.suffix}' + self.oss_policy_name = f'AmazonBedrockOSSPolicyForKnowledgeBase_{self.suffix}' + self.lambda_policy_name = f'AmazonBedrockLambdaPolicyForKnowledgeBase_{self.suffix}' + self.bda_policy_name = f'AmazonBedrockBDAPolicyForKnowledgeBase_{self.suffix}' + self.neptune_policy_name = f'AmazonBedrockNeptunePolicyForKnowledgeBase_{self.suffix}' + self.lambda_arn = None + self.roles = [self.kb_execution_role_name] + + self.vector_store_name = f'bedrock-sample-rag-{self.suffix}' + self.index_name = f"bedrock-sample-rag-index-{self.suffix}" + self.graph_id = None + + self._setup_resources() + + def _validate_models(self): + if self.embedding_model not in valid_embedding_models: + raise ValueError(f"Invalid embedding model. Your embedding model should be one of {valid_embedding_models}") + if self.generation_model not in valid_generation_models: + raise ValueError(f"Invalid Generation model. Your generation model should be one of {valid_generation_models}") + if self.reranking_model not in valid_reranking_models: + raise ValueError(f"Invalid Reranking model. Your reranking model should be one of {valid_reranking_models}") + + def _setup_resources(self): + print("========================================================================================") + print(f"Step 1 - Creating or retrieving S3 bucket(s) for Knowledge Base documents") + self.create_s3_bucket() + + print("========================================================================================") + print(f"Step 2 - Creating Knowledge Base Execution Role ({self.kb_execution_role_name}) and Policies") + self.bedrock_kb_execution_role = self.create_bedrock_execution_role_multi_ds(self.bucket_names, self.secrets_arns) + self.bedrock_kb_execution_role_name = self.bedrock_kb_execution_role['Role']['RoleName'] + + if self.vector_store == "OPENSEARCH_SERVERLESS": + print("========================================================================================") + print(f"Step 3a - Creating OSS encryption, network and data access policies") + self.encryption_policy, self.network_policy, self.access_policy = self.create_policies_in_oss() + + print("========================================================================================") + print(f"Step 3b - Creating OSS Collection (this step takes a couple of minutes to complete)") + self.host, self.collection, self.collection_id, self.collection_arn = self.create_oss() + self.oss_client = OpenSearch( + hosts=[{'host': self.host, 'port': 443}], + http_auth=self.awsauth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + timeout=300 + ) + + print("========================================================================================") + print(f"Step 3c - Creating OSS Vector Index") + self.create_vector_index() + else: + print("========================================================================================") + print(f"Step 3 - Creating Neptune Analytics Graph Index: might take upto 5-7 minutes") + self.graph_id = self.create_neptune() + + + + print("========================================================================================") + print(f"Step 4 - Will create Lambda Function if chunking strategy selected as CUSTOM") + if self.chunking_strategy == "CUSTOM": + print(f"Creating lambda function... as chunking strategy is {self.chunking_strategy}") + response = self.create_lambda() + self.lambda_arn = response['FunctionArn'] + print(response) + print(f"Lambda function ARN: {self.lambda_arn}") + else: + print(f"Not creating lambda function as chunking strategy is {self.chunking_strategy}") + + print("========================================================================================") + print(f"Step 5 - Creating Knowledge Base") + self.knowledge_base, self.data_source = self.create_knowledge_base(self.data_sources) + print("========================================================================================") + + def create_s3_bucket(self, multi_modal=False): + + buckets_to_check = self.bucket_names.copy() + # if multi_modal: + # buckets_to_check.append(buckets_to_check[0] + '-multi-modal-storage') + + if self.multi_modal or self.chunking_strategy == "CUSTOM": + buckets_to_check.append(self.intermediate_bucket_name) + + print(buckets_to_check) + print('buckets_to_check: ', buckets_to_check) + + existing_buckets = [] + for bucket_name in buckets_to_check: + try: + self.s3_client.head_bucket(Bucket=bucket_name) + existing_buckets.append(bucket_name) + print(f'Bucket {bucket_name} already exists - retrieving it!') + except ClientError: + pass + + buckets_to_create = [b for b in buckets_to_check if b not in existing_buckets] + + for bucket_name in buckets_to_create: + print(f'Creating bucket {bucket_name}') + if self.region_name == "us-east-1": + self.s3_client.create_bucket(Bucket=bucket_name) + else: + self.s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={'LocationConstraint': self.region_name} + ) + + def create_lambda(self): + # add to function + lambda_iam_role = self.create_lambda_role() + self.lambda_iam_role_name = lambda_iam_role['Role']['RoleName'] + self.roles.append(self.lambda_iam_role_name) + # Package up the lambda function code + s = BytesIO() + z = zipfile.ZipFile(s, 'w') + z.write("lambda_function.py") + z.close() + zip_content = s.getvalue() + + # Create Lambda Function + lambda_function = self.lambda_client.create_function( + FunctionName=self.lambda_function_name, + Runtime='python3.12', + Timeout=60, + Role=lambda_iam_role['Role']['Arn'], + Code={'ZipFile': zip_content}, + Handler='lambda_function.lambda_handler' + ) + return lambda_function + + def create_lambda_role(self): + lambda_function_role = f'{self.kb_name}-lambda-role-{self.suffix}' + s3_access_policy_name = f'{self.kb_name}-s3-policy' + # Create IAM Role for the Lambda function + try: + assume_role_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + } + + assume_role_policy_document_json = json.dumps(assume_role_policy_document) + + lambda_iam_role = self.iam_client.create_role( + RoleName=lambda_function_role, + AssumeRolePolicyDocument=assume_role_policy_document_json + ) + + # Pause to make sure role is created + time.sleep(10) + except self.iam_client.exceptions.EntityAlreadyExistsException: + lambda_iam_role = self.iam_client.get_role(RoleName=lambda_function_role) + + # Attach the AWSLambdaBasicExecutionRole policy + self.iam_client.attach_role_policy( + RoleName=lambda_function_role, + PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' + ) + + # Create a policy to grant access to the intermediate S3 bucket + s3_access_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:ListBucket", + "s3:PutObject" + ], + "Resource": [ + f"arn:aws:s3:::{self.intermediate_bucket_name}", + f"arn:aws:s3:::{self.intermediate_bucket_name}/*" + ], + "Condition": { + "StringEquals": { + "aws:ResourceAccount": f"{self.account_number}" + } + } + } + ] + } + + # Create the policy + s3_access_policy_json = json.dumps(s3_access_policy) + s3_access_policy_response = self.iam_client.create_policy( + PolicyName=s3_access_policy_name, + PolicyDocument= s3_access_policy_json + ) + + # Attach the policy to the Lambda function's role + self.iam_client.attach_role_policy( + RoleName=lambda_function_role, + PolicyArn=s3_access_policy_response['Policy']['Arn'] + ) + return lambda_iam_role + + def create_bedrock_execution_role_multi_ds(self, bucket_names=None, secrets_arns=None): + """ + Create Knowledge Base Execution IAM Role and its required policies. + If role and/or policies already exist, retrieve them + Returns: + IAM role + """ + + bucket_names = self.bucket_names.copy() + if self.intermediate_bucket_name: + bucket_names.append(self.intermediate_bucket_name) + + # 1. Create and attach policy for foundation models + foundation_model_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "bedrock:InvokeModel", + ], + "Resource": [ + f"arn:aws:bedrock:{self.region_name}::foundation-model/{self.embedding_model}", + f"arn:aws:bedrock:{self.region_name}::foundation-model/{self.generation_model}", + f"arn:aws:bedrock:{self.region_name}::foundation-model/{self.reranking_model}", + f"arn:aws:bedrock:{self.region_name}::foundation-model/{self.graph_model}" + ] + } + ] + } + + # 2. Define policy documents for s3 bucket + if bucket_names: + s3_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:ListBucket", + "s3:PutObject", + "s3:DeleteObject" + ], + "Resource": [item for sublist in [[f'arn:aws:s3:::{bucket}', f'arn:aws:s3:::{bucket}/*'] for bucket in bucket_names] for item in sublist], + "Condition": { + "StringEquals": { + "aws:ResourceAccount": f"{self.account_number}" + } + } + } + ] + } + if self.vector_store == "NEPTUNE_ANALYTICS": + neptune_policy_name = { + "Version": "2012-10-17", + "Statement": [{ + "Sid": "NeptuneAnalyticsAccess", + "Effect": "Allow", + "Action": [ + "*" + ], + "Resource": f"arn:aws:neptune-graph:{self.region_name}:{self.account_number}:graph/*" + } + ] + } + + + # 3. Define policy documents for secrets manager + if secrets_arns: + secrets_manager_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "secretsmanager:GetSecretValue", + "secretsmanager:PutSecretValue" + ], + "Resource": secrets_arns + } + ] + } + + # 4. Define policy documents for BDA + bda_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "BDAGetStatement", + "Effect": "Allow", + "Action": [ + "bedrock:GetDataAutomationStatus" + ], + "Resource": f"arn:aws:bedrock:{self.region_name}:{self.account_number}:data-automation-invocation/*" + }, + { + "Sid": "BDAInvokeStatement", + "Effect": "Allow", + "Action": [ + "bedrock:InvokeDataAutomationAsync" + ], + "Resource": f"arn:aws:bedrock:{self.region_name}:aws:data-automation-project/public-rag-default" + } + ] + } + + + # 5. Define policy documents for lambda + if self.chunking_strategy == "CUSTOM": + lambda_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "LambdaInvokeFunctionStatement", + "Effect": "Allow", + "Action": [ + "lambda:InvokeFunction" + ], + "Resource": [ + f"arn:aws:lambda:{self.region_name}:{self.account_number}:function:{self.lambda_function_name}:*" + ], + "Condition": { + "StringEquals": { + "aws:ResourceAccount": f"{self.account_number}" + } + } + } + ] + } + + cw_log_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "logs:CreateLogStream", + "logs:PutLogEvents", + "logs:DescribeLogStreams" + ], + "Resource": "arn:aws:logs:*:*:log-group:/aws/bedrock/invokemodel:*" + } + ] + } + + assume_role_policy_document = { + "Version": "2012-10-17", + + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "bedrock.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + } + + # combine all policies into one list from policy documents + policies = [ + (self.fm_policy_name, foundation_model_policy_document, 'Policy for accessing foundation model'), + (self.cw_log_policy_name, cw_log_policy_document, 'Policy for writing logs to CloudWatch Logs'), + ] + if self.bucket_names: + policies.append((self.s3_policy_name, s3_policy_document, 'Policy for reading documents from s3')) + if self.secrets_arns: + policies.append((self.sm_policy_name, secrets_manager_policy_document, 'Policy for accessing secret manager')) + if self.chunking_strategy == 'CUSTOM': + policies.append((self.lambda_policy_name, lambda_policy_document, 'Policy for invoking lambda function')) + if self.multi_modal: + policies.append((self.bda_policy_name, bda_policy_document, 'Policy for accessing BDA')) + if self.vector_store == "NEPTUNE_ANALYTICS": + policies.append((self.neptune_policy_name, neptune_policy_name, 'Policy for Neptune Vector Store')) + + # create bedrock execution role + bedrock_kb_execution_role = self.iam_client.create_role( + RoleName=self.kb_execution_role_name, + AssumeRolePolicyDocument=json.dumps(assume_role_policy_document), + Description='Amazon Bedrock Knowledge Base Execution Role for accessing OSS, secrets manager and S3', + MaxSessionDuration=3600 + ) + + # create and attach the policies to the bedrock execution role + for policy_name, policy_document, description in policies: + policy = self.iam_client.create_policy( + PolicyName=policy_name, + PolicyDocument=json.dumps(policy_document), + Description=description, + ) + self.iam_client.attach_role_policy( + RoleName=bedrock_kb_execution_role["Role"]["RoleName"], + PolicyArn=policy["Policy"]["Arn"] + ) + + return bedrock_kb_execution_role + + def create_neptune(self): + response = self.neptune_client.create_graph( + graphName=self.graph_name, + tags={ + 'usecase': 'graphRAG' + }, + publicConnectivity=True, + vectorSearchConfiguration={ + 'dimension': embedding_context_dimensions[self.embedding_model] + }, + replicaCount=1, + deletionProtection=True, + provisionedMemory=16 + ) + graph_id = response["id"] + + self.neptune_client.get_graph(graphIdentifier=graph_id)["status"] + try: + while self.neptune_client.get_graph(graphIdentifier=graph_id)["status"] == "CREATING": + print("Graph is getting creating...") + time.sleep(90) + if response["status"] == "CREATED": + print("Graph created successfully") + except KeyError as e: + print(f"Error: 'status' key not found in response dictionary: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + return graph_id + + def create_policies_in_oss(self): + """ + Create OpenSearch Serverless policy and attach it to the Knowledge Base Execution role. + If policy already exists, attaches it + """ + try: + encryption_policy = self.aoss_client.create_security_policy( + name=self.encryption_policy_name, + policy=json.dumps( + { + 'Rules': [{'Resource': ['collection/' + self.vector_store_name], + 'ResourceType': 'collection'}], + 'AWSOwnedKey': True + }), + type='encryption' + ) + except self.aoss_client.exceptions.ConflictException: + encryption_policy = self.aoss_client.get_security_policy( + name=self.encryption_policy_name, + type='encryption' + ) + + try: + network_policy = self.aoss_client.create_security_policy( + name=self.network_policy_name, + policy=json.dumps( + [ + {'Rules': [{'Resource': ['collection/' + self.vector_store_name], + 'ResourceType': 'collection'}], + 'AllowFromPublic': True} + ]), + type='network' + ) + except self.aoss_client.exceptions.ConflictException: + network_policy = self.aoss_client.get_security_policy( + name=self.network_policy_name, + type='network' + ) + + try: + access_policy = self.aoss_client.create_access_policy( + name=self.access_policy_name, + policy=json.dumps( + [ + { + 'Rules': [ + { + 'Resource': ['collection/' + self.vector_store_name], + 'Permission': [ + 'aoss:CreateCollectionItems', + 'aoss:DeleteCollectionItems', + 'aoss:UpdateCollectionItems', + 'aoss:DescribeCollectionItems'], + 'ResourceType': 'collection' + }, + { + 'Resource': ['index/' + self.vector_store_name + '/*'], + 'Permission': [ + 'aoss:CreateIndex', + 'aoss:DeleteIndex', + 'aoss:UpdateIndex', + 'aoss:DescribeIndex', + 'aoss:ReadDocument', + 'aoss:WriteDocument'], + 'ResourceType': 'index' + }], + 'Principal': [self.identity, self.bedrock_kb_execution_role['Role']['Arn']], + 'Description': 'Easy data policy'} + ]), + type='data' + ) + except self.aoss_client.exceptions.ConflictException: + access_policy = self.aoss_client.get_access_policy( + name=self.access_policy_name, + type='data' + ) + + return encryption_policy, network_policy, access_policy + + def create_oss(self): + """ + Create OpenSearch Serverless Collection. If already existent, retrieve + """ + try: + collection = self.aoss_client.create_collection(name=self.vector_store_name, type='VECTORSEARCH') + collection_id = collection['createCollectionDetail']['id'] + collection_arn = collection['createCollectionDetail']['arn'] + except self.aoss_client.exceptions.ConflictException: + collection = self.aoss_client.batch_get_collection(names=[self.vector_store_name])['collectionDetails'][0] + collection_id = collection['id'] + collection_arn = collection['arn'] + pp.pprint(collection) + + host = collection_id + '.' + self.region_name + '.aoss.amazonaws.com' + print(host) + + response = self.aoss_client.batch_get_collection(names=[self.vector_store_name]) + while (response['collectionDetails'][0]['status']) == 'CREATING': + print('Creating collection...') + interactive_sleep(30) + response = self.aoss_client.batch_get_collection(names=[self.vector_store_name]) + print('\nCollection successfully created:') + pp.pprint(response["collectionDetails"]) + + try: + self.create_oss_policy_attach_bedrock_execution_role(collection_id) + print("Sleeping for a minute to ensure data access rules have been enforced") + interactive_sleep(60) + except Exception as e: + print("Policy already exists") + pp.pprint(e) + + return host, collection, collection_id, collection_arn + + def create_oss_policy_attach_bedrock_execution_role(self, collection_id): + oss_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "aoss:APIAccessAll" + ], + "Resource": [ + f"arn:aws:aoss:{self.region_name}:{self.account_number}:collection/{collection_id}" + ] + } + ] + } + try: + oss_policy = self.iam_client.create_policy( + PolicyName=self.oss_policy_name, + PolicyDocument=json.dumps(oss_policy_document), + Description='Policy for accessing opensearch serverless', + ) + oss_policy_arn = oss_policy["Policy"]["Arn"] + except self.iam_client.exceptions.EntityAlreadyExistsException: + oss_policy_arn = f"arn:aws:iam::{self.account_number}:policy/{self.oss_policy_name}" + + print("Opensearch serverless arn: ", oss_policy_arn) + + self.iam_client.attach_role_policy( + RoleName=self.bedrock_kb_execution_role["Role"]["RoleName"], + PolicyArn=oss_policy_arn + ) + + def create_vector_index(self): + """ + Create OpenSearch Serverless vector index. If existent, ignore + """ + body_json = { + "settings": { + "index.knn": "true", + "number_of_shards": 1, + "knn.algo_param.ef_search": 512, + "number_of_replicas": 0, + }, + "mappings": { + "properties": { + "vector": { + "type": "knn_vector", + "dimension": embedding_context_dimensions[self.embedding_model], + "method": { + "name": "hnsw", + "engine": "faiss", + "space_type": "l2" + }, + }, + "text": { + "type": "text" + }, + "text-metadata": { + "type": "text"} + } + } + } + + try: + response = self.oss_client.indices.create(index=self.index_name, body=json.dumps(body_json)) + print('\nCreating index:') + pp.pprint(response) + interactive_sleep(60) + except RequestError as e: + print(f'Error while trying to create the index, with error {e.error}') + + def create_chunking_strategy_config(self, strategy): + configs = { + + "GRAPH": { + "contextEnrichmentConfiguration": { + "bedrockFoundationModelConfiguration": { + "enrichmentStrategyConfiguration": { + "method": "CHUNK_ENTITY_EXTRACTION" + }, + "modelArn": f"arn:aws:bedrock:{self.region_name}::foundation-model/{self.graph_model}" + }, + "type": "BEDROCK_FOUNDATION_MODEL" + } + }, + + "NONE": { + "chunkingConfiguration": {"chunkingStrategy": "NONE"} + }, + "FIXED_SIZE": { + "chunkingConfiguration": { + "chunkingStrategy": "FIXED_SIZE", + "fixedSizeChunkingConfiguration": { + "maxTokens": 300, + "overlapPercentage": 20 + } + } + }, + "HIERARCHICAL": { + "chunkingConfiguration": { + "chunkingStrategy": "HIERARCHICAL", + "hierarchicalChunkingConfiguration": { + "levelConfigurations": [{"maxTokens": 1500}, {"maxTokens": 300}], + "overlapTokens": 60 + } + } + }, + "SEMANTIC": { + "chunkingConfiguration": { + "chunkingStrategy": "SEMANTIC", + "semanticChunkingConfiguration": { + "maxTokens": 300, + "bufferSize": 1, + "breakpointPercentileThreshold": 95} + } + }, + "CUSTOM": { + "customTransformationConfiguration": { + "intermediateStorage": { + "s3Location": { + "uri": f"s3://{self.intermediate_bucket_name}/" + } + }, + "transformations": [ + { + "transformationFunction": { + "transformationLambdaConfiguration": { + "lambdaArn": self.lambda_arn + } + }, + "stepToApply": "POST_CHUNKING" + } + ] + }, + "chunkingConfiguration": {"chunkingStrategy": "NONE"} + } + } + return configs.get(strategy, configs["NONE"]) + + @retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=7) + def create_knowledge_base(self, data_sources): + """ + Create Knowledge Base and its Data Source. If existent, retrieve + """ + if self.graph_id: + storage_configuration = { + "type": "NEPTUNE_ANALYTICS", + "neptuneAnalyticsConfiguration": { + "graphArn": f"arn:aws:neptune-graph:{self.region_name}:{self.account_number}:graph/{self.graph_id}", + "fieldMapping": { + "textField": "text", + "metadataField": "text-metadata" + } + } + } + else: + storage_configuration = { + "type": "OPENSEARCH_SERVERLESS", + "opensearchServerlessConfiguration": { + "collectionArn": self.collection_arn, + "vectorIndexName": self.index_name, + "fieldMapping": { + "vectorField": "vector", + "textField": "text", + "metadataField": "text-metadata" + } + } + } + + # create Knowledge Bases + embedding_model_arn = f"arn:aws:bedrock:{self.region_name}::foundation-model/{self.embedding_model}" + knowledgebase_configuration = { "type": "VECTOR", "vectorKnowledgeBaseConfiguration": { "embeddingModelArn": embedding_model_arn}} + + if self.multi_modal: + supplemental_storageLocation={"storageLocations": [{ "s3Location": { "uri": f"s3://{self.intermediate_bucket_name}"},"type": "S3"}]} + knowledgebase_configuration['vectorKnowledgeBaseConfiguration']['supplementalDataStorageConfiguration'] = supplemental_storageLocation + + try: + create_kb_response = self.bedrock_agent_client.create_knowledge_base( + name=self.kb_name, + description=self.kb_description, + roleArn=self.bedrock_kb_execution_role['Role']['Arn'], + knowledgeBaseConfiguration=knowledgebase_configuration, + storageConfiguration=storage_configuration, + ) + kb = create_kb_response["knowledgeBase"] + pp.pprint(kb) + except self.bedrock_agent_client.exceptions.ConflictException: + kbs = self.bedrock_agent_client.list_knowledge_bases(maxResults=100) + kb_id = next((kb['knowledgeBaseId'] for kb in kbs['knowledgeBaseSummaries'] if kb['name'] == self.kb_name), None) + response = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id) + kb = response['knowledgeBase'] + pp.pprint(kb) + + # create Data Sources + print("Creating Data Sources") + try: + ds_list = self.create_data_sources(kb_id, self.data_sources) + pp.pprint(ds_list) + except self.bedrock_agent_client.exceptions.ConflictException: + ds_id = self.bedrock_agent_client.list_data_sources( + knowledgeBaseId=kb['knowledgeBaseId'], + maxResults=100 + )['dataSourceSummaries'][0]['dataSourceId'] + get_ds_response = self.bedrock_agent_client.get_data_source( + dataSourceId=ds_id, + knowledgeBaseId=kb['knowledgeBaseId'] + ) + ds = get_ds_response["dataSource"] + pp.pprint(ds) + return kb, ds_list + + def create_data_sources(self, kb_id, data_sources): + """ + Create Data Sources for the Knowledge Base. + """ + ds_list=[] + + # create data source for each data source type in list data_sources + for idx, ds in enumerate(data_sources): + + # The data source to ingest documents from, into the OpenSearch serverless knowledge base index + s3_data_source_congiguration = { + "type": "S3", + "s3Configuration":{ + "bucketArn": "", + # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes. + } + } + + confluence_data_source_congiguration = { + "confluenceConfiguration": { + "sourceConfiguration": { + "hostUrl": "", + "hostType": "SAAS", + "authType": "", # BASIC | OAUTH2_CLIENT_CREDENTIALS + "credentialsSecretArn": "" + + }, + "crawlerConfiguration": { + "filterConfiguration": { + "type": "PATTERN", + "patternObjectFilter": { + "filters": [ + { + "objectType": "Attachment", + "inclusionFilters": [ + ".*\\.pdf" + ], + "exclusionFilters": [ + ".*private.*\\.pdf" + ] + } + ] + } + } + } + }, + "type": "CONFLUENCE" + } + + sharepoint_data_source_congiguration = { + "sharePointConfiguration": { + "sourceConfiguration": { + "tenantId": "", + "hostType": "ONLINE", + "domain": "domain", + "siteUrls": [], + "authType": "", # BASIC | OAUTH2_CLIENT_CREDENTIALS + "credentialsSecretArn": "" + + }, + "crawlerConfiguration": { + "filterConfiguration": { + "type": "PATTERN", + "patternObjectFilter": { + "filters": [ + { + "objectType": "Attachment", + "inclusionFilters": [ + ".*\\.pdf" + ], + "exclusionFilters": [ + ".*private.*\\.pdf" + ] + } + ] + } + } + } + }, + "type": "SHAREPOINT" + } + + + salesforce_data_source_congiguration = { + "salesforceConfiguration": { + "sourceConfiguration": { + "hostUrl": "", + "authType": "", # BASIC | OAUTH2_CLIENT_CREDENTIALS + "credentialsSecretArn": "" + }, + "crawlerConfiguration": { + "filterConfiguration": { + "type": "PATTERN", + "patternObjectFilter": { + "filters": [ + { + "objectType": "Attachment", + "inclusionFilters": [ + ".*\\.pdf" + ], + "exclusionFilters": [ + ".*private.*\\.pdf" + ] + } + ] + } + } + } + }, + "type": "SALESFORCE" + } + + webcrawler_data_source_congiguration = { + "webConfiguration": { + "sourceConfiguration": { + "urlConfiguration": { + "seedUrls": [] + } + }, + "crawlerConfiguration": { + "crawlerLimits": { + "rateLimit": 50 + }, + "scope": "HOST_ONLY", + "inclusionFilters": [], + "exclusionFilters": [] + } + }, + "type": "WEB" + } + + # Set the data source configuration based on the Data source type + + if ds['type'] == "S3": + print(f'{idx +1 } data source: S3') + ds_name = f'{kb_id}-s3' + s3_data_source_congiguration["s3Configuration"]["bucketArn"] = f'arn:aws:s3:::{ds["bucket_name"]}' + # print(s3_data_source_congiguration) + data_source_configuration = s3_data_source_congiguration + + if ds['type'] == "CONFLUENCE": + print(f'{idx +1 } data source: CONFLUENCE') + ds_name = f'{kb_id}-confluence' + confluence_data_source_congiguration['confluenceConfiguration']['sourceConfiguration']['hostUrl'] = ds['hostUrl'] + confluence_data_source_congiguration['confluenceConfiguration']['sourceConfiguration']['authType'] = ds['authType'] + confluence_data_source_congiguration['confluenceConfiguration']['sourceConfiguration']['credentialsSecretArn'] = ds['credentialsSecretArn'] + # print(confluence_data_source_congiguration) + data_source_configuration = confluence_data_source_congiguration + + if ds['type'] == "SHAREPOINT": + print(f'{idx +1 } data source: SHAREPOINT') + ds_name = f'{kb_id}-sharepoint' + sharepoint_data_source_congiguration['sharePointConfiguration']['sourceConfiguration']['tenantId'] = ds['tenantId'] + sharepoint_data_source_congiguration['sharePointConfiguration']['sourceConfiguration']['domain'] = ds['domain'] + sharepoint_data_source_congiguration['sharePointConfiguration']['sourceConfiguration']['authType'] = ds['authType'] + sharepoint_data_source_congiguration['sharePointConfiguration']['sourceConfiguration']['siteUrls'] = ds["siteUrls"] + sharepoint_data_source_congiguration['sharePointConfiguration']['sourceConfiguration']['credentialsSecretArn'] = ds['credentialsSecretArn'] + # print(sharepoint_data_source_congiguration) + data_source_configuration = sharepoint_data_source_congiguration + + + if ds['type'] == "SALESFORCE": + print(f'{idx +1 } data source: SALESFORCE') + ds_name = f'{kb_id}-salesforce' + salesforce_data_source_congiguration['salesforceConfiguration']['sourceConfiguration']['hostUrl'] = ds['hostUrl'] + salesforce_data_source_congiguration['salesforceConfiguration']['sourceConfiguration']['authType'] = ds['authType'] + salesforce_data_source_congiguration['salesforceConfiguration']['sourceConfiguration']['credentialsSecretArn'] = ds['credentialsSecretArn'] + # print(salesforce_data_source_congiguration) + data_source_configuration = salesforce_data_source_congiguration + + if ds['type'] == "WEB": + print(f'{idx +1 } data source: WEB') + ds_name = f'{kb_id}-web' + webcrawler_data_source_congiguration['webConfiguration']['sourceConfiguration']['urlConfiguration']['seedUrls'] = ds['seedUrls'] + webcrawler_data_source_congiguration['webConfiguration']['crawlerConfiguration']['inclusionFilters'] = ds['inclusionFilters'] + webcrawler_data_source_congiguration['webConfiguration']['crawlerConfiguration']['exclusionFilters'] = ds['exclusionFilters'] + # print(webcrawler_data_source_congiguration) + data_source_configuration = webcrawler_data_source_congiguration + + + # Create a DataSource in KnowledgeBase + chunking_strategy_configuration = self.create_chunking_strategy_config(self.chunking_strategy) + print("============Chunking config========\n", chunking_strategy_configuration) + vector_ingestion_configuration = chunking_strategy_configuration + + if self.multi_modal: + if self.parser == "BEDROCK_FOUNDATION_MODEL": + parsing_configuration = {"bedrockFoundationModelConfiguration": + {"parsingModality": "MULTIMODAL", "modelArn": f"arn:aws:bedrock:{self.region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0"}, + "parsingStrategy": "BEDROCK_FOUNDATION_MODEL"} + + if self.parser == 'BEDROCK_DATA_AUTOMATION': + parsing_configuration = {"bedrockDataAutomationConfiguration": {"parsingModality": "MULTIMODAL"}, "parsingStrategy": "BEDROCK_DATA_AUTOMATION"} + + vector_ingestion_configuration["parsingConfiguration"] = parsing_configuration + + create_ds_response = self.bedrock_agent_client.create_data_source( + name = ds_name, + description = self.kb_description, + knowledgeBaseId = kb_id, + dataSourceConfiguration = data_source_configuration, + vectorIngestionConfiguration = vector_ingestion_configuration + ) + ds = create_ds_response["dataSource"] + pp.pprint(ds) + # self.data_sources[idx]['dataSourceId'].append(ds['dataSourceId']) + ds_list.append(ds) + return ds_list + + + def start_ingestion_job(self): + """ + Start an ingestion job to synchronize data from an S3 bucket to the Knowledge Base + """ + + for idx, ds in enumerate(self.data_sources): + try: + start_job_response = self.bedrock_agent_client.start_ingestion_job( + knowledgeBaseId=self.knowledge_base['knowledgeBaseId'], + dataSourceId=self.data_source[idx]["dataSourceId"] + ) + job = start_job_response["ingestionJob"] + print(f"job {idx+1} started successfully\n") + # pp.pprint(job) + while job['status'] not in ["COMPLETE", "FAILED", "STOPPED"]: + get_job_response = self.bedrock_agent_client.get_ingestion_job( + knowledgeBaseId=self.knowledge_base['knowledgeBaseId'], + dataSourceId=self.data_source[idx]["dataSourceId"], + ingestionJobId=job["ingestionJobId"] + ) + job = get_job_response["ingestionJob"] + pp.pprint(job) + interactive_sleep(40) + + except Exception as e: + print(f"Couldn't start {idx} job.\n") + print(e) + + + def get_knowledge_base_id(self): + """ + Get Knowledge Base Id + """ + pp.pprint(self.knowledge_base["knowledgeBaseId"]) + return self.knowledge_base["knowledgeBaseId"] + + def get_bucket_name(self): + """ + Get the name of the bucket connected with the Knowledge Base Data Source + """ + pp.pprint(f"Bucket connected with KB: {self.bucket_name}") + return self.bucket_name + + def delete_kb(self, delete_s3_bucket=False, delete_iam_roles_and_policies=True, delete_lambda_function=False): + """ + Delete the Knowledge Base resources + Args: + delete_s3_bucket (bool): boolean to indicate if s3 bucket should also be deleted + delete_iam_roles_and_policies (bool): boolean to indicate if IAM roles and Policies should also be deleted + delete_lambda_function (bool): boolean to indicate if Lambda function should also be deleted + """ + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore") + + # delete knowledge base and data source. + + # Delete knowledge base and data sources + try: + # First delete all data sources + for ds in self.data_source: + try: + self.bedrock_agent_client.delete_data_source( + dataSourceId=ds["dataSourceId"], + knowledgeBaseId=self.knowledge_base['knowledgeBaseId'] + ) + print(f"Deleted data source {ds['dataSourceId']}") + except self.bedrock_agent_client.exceptions.ResourceNotFoundException: + print(f"Data source {ds['dataSourceId']} not found") + except Exception as e: + print(f"Error deleting data source {ds['dataSourceId']}: {str(e)}") + + # Then delete the knowledge base + self.bedrock_agent_client.delete_knowledge_base( + knowledgeBaseId=self.knowledge_base['knowledgeBaseId'] + ) + print("======== Knowledge base and all data sources deleted =========") + + except self.bedrock_agent_client.exceptions.ResourceNotFoundException as e: + print("Knowledge base not found:", e) + except Exception as e: + print(f"Error during knowledge base deletion: {str(e)}") + + # delete s3 bucket + if delete_s3_bucket==True: + self.delete_s3() + + # delete IAM role and policies + if delete_iam_roles_and_policies: + self.delete_iam_roles_and_policies() + + if delete_lambda_function: + try: + self.delete_lambda_function() + print(f"Deleted Lambda function {self.lambda_function_name}") + except self.lambda_client.exceptions.ResourceNotFoundException: + print(f"Lambda function {self.lambda_function_name} not found.") + + # delete vector index and collection from vector store + if self.vector_store=="OPENSEARCH_SERVERLESS": + try: + self.aoss_client.delete_collection(id=self.collection_id) + self.aoss_client.delete_access_policy( + type="data", + name=self.access_policy_name + ) + self.aoss_client.delete_security_policy( + type="network", + name=self.network_policy_name + ) + self.aoss_client.delete_security_policy( + type="encryption", + name=self.encryption_policy_name + ) + print("======== Vector Index, collection and associated policies deleted =========") + except Exception as e: + print(e) + else: + try: + # disable delete protection + response = self.neptune_client.update_graph( + graphIdentifier=self.graph_id, + deletionProtection=False) + print("======= Delete protection disabled before deleting the graph: ", response['deletionProtection']) + + # delete the graph + self.neptune_client.delete_graph( + graphIdentifier=self.graph_id, + skipSnapshot=True) + print("========= Neptune Analytics Graph Deleted =================================") + except Exception as e: + print(e) + + + def delete_iam_roles_and_policies(self): + for role_name in self.roles: + print(f"Found role {role_name}") + try: + self.iam_client.get_role(RoleName=role_name) + except self.iam_client.exceptions.NoSuchEntityException: + print(f"Role {role_name} does not exist") + continue + attached_policies = self.iam_client.list_attached_role_policies(RoleName=role_name)["AttachedPolicies"] + print(f"======Attached policies with role {role_name}========\n", attached_policies) + for attached_policy in attached_policies: + policy_arn = attached_policy["PolicyArn"] + policy_name = attached_policy["PolicyName"] + self.iam_client.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn) + print(f"Detached policy {policy_name} from role {role_name}") + if str(policy_arn.split("/")[1]) == "service-role": + print(f"Skipping deletion of service-linked role policy {policy_name}") + else: + self.iam_client.delete_policy(PolicyArn=policy_arn) + print(f"Deleted policy {policy_name} from role {role_name}") + + self.iam_client.delete_role(RoleName=role_name) + print(f"Deleted role {role_name}") + print("======== All IAM roles and policies deleted =========") + + def bucket_exists(bucket): + s3 = boto3.resource('s3') + return s3.Bucket(bucket) in s3.buckets.all() + + def delete_s3(self): + """ + Delete the objects contained in the Knowledge Base S3 bucket. + Once the bucket is empty, delete the bucket + """ + s3 = boto3.resource('s3') + bucket_names = self.bucket_names.copy() + if self.intermediate_bucket_name: + bucket_names.append(self.intermediate_bucket_name) + + for bucket_name in bucket_names: + try: + bucket = s3.Bucket(bucket_name) + if bucket in s3.buckets.all(): + print(f"Found bucket {bucket_name}") + # Delete all objects including versions (if versioning enabled) + bucket.object_versions.delete() + bucket.objects.all().delete() + print(f"Deleted all objects in bucket {bucket_name}") + + # Delete the bucket + bucket.delete() + print(f"Deleted bucket {bucket_name}") + else: + print(f"Bucket {bucket_name} does not exist, skipping deletion") + except Exception as e: + print(f"Error deleting bucket {bucket_name}: {str(e)}") + + print("======== S3 bucket deletion process completed =========") + + + def delete_lambda_function(self): + """ + Delete the Knowledge Base Lambda function + Delete the IAM role used by the Knowledge Base Lambda function + """ + # delete lambda function + try: + self.lambda_client.delete_function(FunctionName=self.lambda_function_name) + print(f"======== Lambda function {self.lambda_function_name} deleted =========") + except Exception as e: + print(e) \ No newline at end of file diff --git a/02-samples/README.md b/02-samples/README.md index 52d1108e..60644280 100644 --- a/02-samples/README.md +++ b/02-samples/README.md @@ -11,6 +11,8 @@ | 5 | [Personal Assistant (multi-agent system)](./05-personal-assistant/) | A comprehensive productivity assistant that integrates calendar management, web-search via Perplexity MCP Server, and development assistance through an intelligent multi-agent architecture. | | 6 | [Code Assistant](./06-code-assistant/) | The Code Assistant Agent is not just another toolβ€”it's your personal AI-powered coding companion designed to supercharge your development workflow. | 7 | [Fintech Assistant (WhatsApp Integration)](./07-whatsapp-fintech-sample/) | Fintech assistant bot on WhatsApp that coordinates customer support with tools. It loads daily promotions, based on the day of the week. Another agent is responsible for handle credit card operations, being able to load last X days of fake transactions and schedule card payment. | -| 8 | [Data Warehouse Optimizer (SQLite)](./07-data-warehouse-optimizer/) | A multi-agent system simulating a data warehouse query optimizer using SQLite. It includes Analyzer, Rewriter, and Validator agents collaborating to analyze SQL query plans (`EXPLAIN QUERY PLAN`), suggest optimizations, and validate improvements. Powered by Claude 3 Haiku on AWS Bedrock and logs observability with OpenTelemetry. Produces a final JSON report with analysis and validation results. | +| 8 | [Data Warehouse Optimizer (SQLite)](./08-data-warehouse-optimizer/) | A multi-agent system simulating a data warehouse query optimizer using SQLite. It includes Analyzer, Rewriter, and Validator agents collaborating to analyze SQL query plans (`EXPLAIN QUERY PLAN`), suggest optimizations, and validate improvements. Powered by Claude 3 Haiku on AWS Bedrock and logs observability with OpenTelemetry. Produces a final JSON report with analysis and validation results. | | 9 | [Finance-Assistant Swarm Agent Collaboration](./09-finance-assistant-swarm-agent/) | Finance-Assistant Swarm Agent Collaboration is a modular, multi-agent system designed to autonomously generate comprehensive equity research reports from a single stock query. Built using the Strands SDK and powered by Amazon Bedrock, this assistant orchestrates a collaborative swarm of specialized agentsβ€”each responsible for a distinct financial research task including ticker resolution, company profiling, price analytics, financial health assessment, and sentiment analysis. | | 10 | [Email Assistant with RAG and Image Generation](./10-multi-modal-email-assistant-agent/) | Multi-modal email assistant demonstrates the power of agent collaboration for enterprise communication, offering a scalable framework for automating professional content creation in domains such as marketing, reporting, and customer engagement. | +| 11 | [Customer Support Agent](./11-customer-support-solar-panel/) | A customer support agent for solar panels. It demonstrates a customer support agent integrating with Amazon Bedrock Knowledge Bases, Amazon Bedrock Guardrails, Amazon DynamoDB, Atlassian JIRA, Mem0 and Langfuse. | +