diff --git a/python-langgraph/README.md b/python-langgraph/README.md new file mode 100644 index 0000000000..fe47171827 --- /dev/null +++ b/python-langgraph/README.md @@ -0,0 +1,44 @@ +# LangGraph: Build Stateful AI Agents in Python + +This folder contains the source code for [LangGraph: Build Stateful AI Agents in Python](https://realpython.com/langgraph-build-stateful-ai-agents-in-python/) + +## Setup + +Create a new virtual environment, and run the following command to install LangGraph and the additional requirements for this project: + +```console +(venv) $ python -m pip install -r requirements.txt +``` + +You'll use `langchain-openai` to interact with OpenAI LLMs, but keep in mind you can use any LLM provider you like with LangGraph and LangChain. You'll use [`pydantic`](https://realpython.com/python-pydantic/) to validate the information your agent parses from emails. + +Before moving forward, if you choose to use OpenAI, make sure you're signed up for an OpenAI account and you have a valid [API key](https://openai.com/api/). You'll need to set the following [environment variable](https://en.wikipedia.org/wiki/Environment_variable) before running any examples in this tutorial: + +```dotenv +OPENAI_API_KEY= +``` + +## Usage + +Once your environment is set up, you can run the final graph agent on an example input with the following code: + +```python +from graphs.email_agent import email_agent_graph +from example_emails import EMAILS + +escalation_criteria = """"There's an immediate risk of electrical, +water, or fire damage""" + +message_with_criteria = f""" +The escalation criteria is: {escalation_criteria} + +Here's the email: +{EMAILS[3]} +""" +message_3 = {"messages": [("human", message_with_criteria)]} + +for chunk in email_agent_graph.stream(message_3, stream_mode="values"): + chunk["messages"][-1].pretty_print() +``` + +See the tutorial for all the details on what's going on here. diff --git a/python-langgraph/chains/binary_questions.py b/python-langgraph/chains/binary_questions.py new file mode 100644 index 0000000000..bf1fe2c830 --- /dev/null +++ b/python-langgraph/chains/binary_questions.py @@ -0,0 +1,31 @@ +from langchain_core.prompts import ChatPromptTemplate +from langchain_openai import ChatOpenAI +from pydantic import BaseModel, Field + + +class BinaryAnswer(BaseModel): + is_true: bool = Field( + description="""Whether the answer to the question is yes or no. + True if yes otherwise false.""" + ) + + +binary_question_prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """ + Answer this question as True for "yes" and False for "no". + No other answers are allowed: + {question} + """, + ) + ] +) + +binary_question_model = ChatOpenAI(model="gpt-4o-mini", temperature=0) + +BINARY_QUESTION_CHAIN = ( + binary_question_prompt + | binary_question_model.with_structured_output(BinaryAnswer) +) diff --git a/python-langgraph/chains/escalation_check.py b/python-langgraph/chains/escalation_check.py new file mode 100644 index 0000000000..739b0cb5a1 --- /dev/null +++ b/python-langgraph/chains/escalation_check.py @@ -0,0 +1,33 @@ +from langchain_core.prompts import ChatPromptTemplate +from langchain_openai import ChatOpenAI +from pydantic import BaseModel, Field + + +class EscalationCheck(BaseModel): + needs_escalation: bool = Field( + description="""Whether the notice requires escalation according + to specified criteria""" + ) + + +escalation_prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """ + Determine whether the following notice received from a regulatory + body requires immediate escalation. Immediate escalation is + required when {escalation_criteria}. + Here's the notice message: + {message} + """, + ) + ] +) + +escalation_check_model = ChatOpenAI(model="gpt-4o-mini", temperature=0) + +ESCALATION_CHECK_CHAIN = ( + escalation_prompt + | escalation_check_model.with_structured_output(EscalationCheck) +) diff --git a/python-langgraph/chains/notice_extraction.py b/python-langgraph/chains/notice_extraction.py new file mode 100644 index 0000000000..1b4ba558f1 --- /dev/null +++ b/python-langgraph/chains/notice_extraction.py @@ -0,0 +1,106 @@ +from datetime import date, datetime + +from langchain_core.prompts import ChatPromptTemplate +from langchain_openai import ChatOpenAI +from pydantic import BaseModel, Field, computed_field + + +class NoticeEmailExtract(BaseModel): + date_of_notice_str: str | None = Field( + default=None, + exclude=True, + repr=False, + description="""The date of the notice (if any) reformatted + to match YYYY-mm-dd""", + ) + entity_name: str | None = Field( + default=None, + description="""The name of the entity sending the notice (if present + in the message)""", + ) + entity_phone: str | None = Field( + default=None, + description="""The phone number of the entity sending the notice + (if present in the message)""", + ) + entity_email: str | None = Field( + default=None, + description="""The email of the entity sending the notice + (if present in the message)""", + ) + project_id: int | None = Field( + default=None, + description="""The project ID (if present in the message) - + must be an integer""", + ) + site_location: str | None = Field( + default=None, + description="""The site location of the project (if present + in the message). Use the full address if possible.""", + ) + violation_type: str | None = Field( + default=None, + description="""The type of violation (if present in the + message)""", + ) + required_changes: str | None = Field( + default=None, + description="""The required changes specified by the entity + (if present in the message)""", + ) + compliance_deadline_str: str | None = Field( + default=None, + exclude=True, + repr=False, + description="""The date that the company must comply (if any) + reformatted to match YYYY-mm-dd""", + ) + max_potential_fine: float | None = Field( + default=None, + description="""The maximum potential fine + (if any)""", + ) + + @staticmethod + def _convert_string_to_date(date: str | None) -> date | None: + try: + return datetime.strptime(date, "%Y-%m-%d").date() + except Exception as e: + print(e) + return None + + @computed_field + @property + def date_of_notice(self) -> date | None: + return self._convert_string_to_date(self.date_of_notice_str) + + @computed_field + @property + def compliance_deadline(self) -> date | None: + return self._convert_string_to_date(self.compliance_deadline_str) + + +info_parse_prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """ + Parse the date of notice, sending entity name, sending entity + phone, sending entity email, project id, site location, violation + type, required changes, compliance deadline, and maximum potential + fine from the message. If any of the fields aren't present, don't + populate them. Try to cast dates into the YYYY-mm-dd format. Don't + populate fields if they're not present in the message. + Here's the notice message: + {message} + """, + ) + ] +) + +notice_parser_model = ChatOpenAI(model="gpt-4o-mini", temperature=0) + +NOTICE_PARSER_CHAIN = ( + info_parse_prompt + | notice_parser_model.with_structured_output(NoticeEmailExtract) +) diff --git a/python-langgraph/example_emails.py b/python-langgraph/example_emails.py new file mode 100644 index 0000000000..d9df1dd89c --- /dev/null +++ b/python-langgraph/example_emails.py @@ -0,0 +1,72 @@ +EMAILS = [ + # Email 1 + """ + Date: October 15, 2024 + From: Occupational Safety and Health Administration (OSHA) + To: Blue Ridge Construction, project 111232345 - Downtown Office Complex + Location: Dallas, TX + + During a recent inspection of your construction site at 123 Main Street, + the following safety violations were identified: + + Lack of fall protection: Workers on scaffolding above 10 feet were + without required harnesses or other fall protection equipment. + Unsafe scaffolding setup: Several scaffolding structures were noted as + lacking secure base plates and bracing, creating potential collapse risks. + Inadequate personal protective equipment (PPE): Multiple workers were found + without proper PPE, including hard hats and safety glasses. + Required Corrective Actions: + + Install guardrails and fall arrest systems on all scaffolding over 10 feet. + Conduct an inspection of all scaffolding structures and reinforce unstable + sections. Ensure all workers on-site are provided with necessary PPE and conduct + safety training on proper usage. + Deadline for Compliance: All violations must be rectified by November 10, 2024. + Failure to comply may result in fines of up to $25,000 per violation. + + Contact: For questions or to confirm compliance, please reach out to the OSHA + regional office at (555) 123-4567 or email compliance.osha@osha.gov. + """, + # Email 2 + """ + From: debby@stack.com + Hey Betsy, + Here's your invoice for $1000 for the cookies you ordered. + """, + # Email 3 + """ + From: tdavid@companyxyz.com + Hi Paul, + We have an issue with the HVAC system your team installed in + apartment 1235. We'd like to request maintenance or a refund. + Thanks, + Terrance + """, + # Email 4 + """ + Date: January 10, 2025 + From: City of Los Angeles Building and Safety Department + To: West Coast Development, project 345678123 - Sunset Luxury + Condominiums + Location: Los Angeles, CA + Following an inspection of your site at 456 Sunset Boulevard, we have + identified the following building code violations: + Electrical Wiring: Exposed wiring was found in the underground parking + garage, posing a safety hazard. Fire Safety: Insufficient fire + extinguishers were available across multiple floors of the structure + under construction. + Structural Integrity: The temporary support beams in the eastern wing + do not meet the load-bearing standards specified in local building codes. + Required Corrective Actions: + Replace or properly secure exposed wiring to meet electrical safety + standards. Install additional fire extinguishers in compliance with + fire code requirements. Reinforce or replace temporary support beams + to ensure structural stability. Deadline for Compliance: Violations + must be addressed no later than February 5, + 2025. Failure to comply may result in + a stop-work order and additional fines. + Contact: For questions or to schedule a re-inspection, please contact + the Building and Safety Department at + (555) 456-7890 or email inspections@lacity.gov. + """, +] diff --git a/python-langgraph/graphs/email_agent.py b/python-langgraph/graphs/email_agent.py new file mode 100644 index 0000000000..ae5c257d62 --- /dev/null +++ b/python-langgraph/graphs/email_agent.py @@ -0,0 +1,144 @@ +import time + +from chains.notice_extraction import NoticeEmailExtract +from graphs.notice_extraction import NOTICE_EXTRACTION_GRAPH +from langchain_core.messages import AIMessage +from langchain_core.tools import tool +from langchain_openai import ChatOpenAI +from langgraph.graph import END, START, MessagesState, StateGraph +from langgraph.prebuilt import ToolNode +from utils.logging_config import LOGGER + + +@tool +def forward_email(email_message: str, send_to_email: str) -> bool: + """ + Forward an email_message to the address of sent_to_email. Returns + true if the email was successful otherwise it wil return false. Note + that this tool only forwards the email to an internal department - + it does not reply to the sender. + """ + + LOGGER.info(f"Forwarding the email to {send_to_email}...") + time.sleep(2) + LOGGER.info("Email forwarded!") + + return True + + +@tool +def send_wrong_email_notification_to_sender( + sender_email: str, correct_department: str +): + """ + Send an email back to the sender informing them that + they have the wrong address. The email should be sent + to the correct_department. + """ + + LOGGER.info(f"Sending wrong email notification to {sender_email}...") + time.sleep(2) + LOGGER.info("Email sent!") + + return True + + +@tool +def extract_notice_data( + email: str, escalation_criteria: str +) -> NoticeEmailExtract: + """ + Extract structured fields from a regulatory notice. + This should be used when the email message comes from + a regulatory body or auditor regarding a property or + construction site that the company works on. + escalation_criteria is a description of which kinds of + notices require immediate escalation. + After calling this tool, you don't need to call any others. + """ + + LOGGER.info("Calling the email notice extraction graph...") + + initial_state = { + "notice_message": email, + "notice_email_extract": None, + "critical_fields_missing": False, + "escalation_text_criteria": escalation_criteria, + "escalation_dollar_criteria": 100_000, + "requires_escalation": False, + "escalation_emails": ["brog@abc.com", "bigceo@company.com"], + } + + results = NOTICE_EXTRACTION_GRAPH.invoke(initial_state) + return results["notice_email_extract"] + + +@tool +def determine_email_action(email: str) -> str: + """ + Call to determine which action should be taken + for an email. Only use when the other tools don't seem + relevant for the email task. Do not call this tool if + you've already called extract_notice_data. + """ + + instructions = """ + If the email appears to be an invoice of any kind or related to + billing, forward the email to the billing and invoices team: + billing@company.com + and send a wrong email notice back to the sender. The correct department is + billing@company.com. + If the email appears to be from a customer, forward to support@company.com, + cdetuma@company.com, and ctu@abc.com. Be sure to forward it to all three + emails listed. + Send a wrong email notice back to the + customer and let them know the correct department is support@company.com. + For any other emails, please send a wrong email notification and try to + infer the correct department from one of billing@company.com, + support@company.com, + humanresources@company.com, and it@company.com. + """ + + return instructions + + +tools = [ + determine_email_action, + forward_email, + send_wrong_email_notification_to_sender, + extract_notice_data, +] +tool_node = ToolNode(tools) + +EMAIL_AGENT_MODEL = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools( + tools +) + + +def call_agent_model_node(state: MessagesState) -> dict[str, list[AIMessage]]: + """Node to call the email agent model""" + messages = state["messages"] + response = EMAIL_AGENT_MODEL.invoke(messages) + return {"messages": [response]} + + +def route_agent_graph_edge(state: MessagesState) -> str: + """Determine whether to call more tools or exit the graph""" + last_message = state["messages"][-1] + if last_message.tool_calls: + return "email_tools" + return END + + +workflow = StateGraph(MessagesState) + +workflow.add_node("email_agent", call_agent_model_node) +workflow.add_node("email_tools", tool_node) + +workflow.add_edge(START, "email_agent") +workflow.add_conditional_edges( + "email_agent", route_agent_graph_edge, ["email_tools", END] +) +workflow.add_edge("email_tools", "email_agent") + +email_agent_graph = workflow.compile() diff --git a/python-langgraph/graphs/notice_extraction.py b/python-langgraph/graphs/notice_extraction.py new file mode 100644 index 0000000000..bc6958765c --- /dev/null +++ b/python-langgraph/graphs/notice_extraction.py @@ -0,0 +1,156 @@ +from typing import TypedDict + +from chains.binary_questions import BINARY_QUESTION_CHAIN +from chains.escalation_check import ESCALATION_CHECK_CHAIN +from chains.notice_extraction import NOTICE_PARSER_CHAIN, NoticeEmailExtract +from langgraph.graph import END, START, StateGraph +from pydantic import EmailStr +from utils.graph_utils import create_legal_ticket, send_escalation_email +from utils.logging_config import LOGGER + + +class GraphState(TypedDict): + notice_message: str + notice_email_extract: NoticeEmailExtract | None + escalation_text_criteria: str + escalation_dollar_criteria: float + requires_escalation: bool + escalation_emails: list[EmailStr] | None + follow_ups: dict[str, bool] | None + current_follow_up: str | None + + +workflow = StateGraph(GraphState) + + +def parse_notice_message_node(state: GraphState) -> GraphState: + """Use the notice parser chain to extract fields from the notice""" + + LOGGER.info("Parsing notice...") + + notice_email_extract = NOTICE_PARSER_CHAIN.invoke( + {"message": state["notice_message"]} + ) + + state["notice_email_extract"] = notice_email_extract + + return state + + +def check_escalation_status_node(state: GraphState) -> GraphState: + """Determine whether a notice needs escalation""" + + LOGGER.info("Determining escalation status...") + + text_check = ESCALATION_CHECK_CHAIN.invoke( + { + "escalation_criteria": state["escalation_text_criteria"], + "message": state["notice_message"], + } + ).needs_escalation + + if ( + text_check + or state["notice_email_extract"].max_potential_fine + >= state["escalation_dollar_criteria"] + ): + state["requires_escalation"] = True + else: + state["requires_escalation"] = False + + return state + + +def send_escalation_email_node(state: GraphState) -> GraphState: + """Send an escalation email""" + + send_escalation_email( + notice_email_extract=state["notice_email_extract"], + escalation_emails=state["escalation_emails"], + ) + + return state + + +def create_legal_ticket_node(state: GraphState) -> GraphState: + """Node to create a legal ticket""" + + follow_up = create_legal_ticket( + current_follow_ups=state.get("follow_ups"), + notice_email_extract=state["notice_email_extract"], + ) + + state["current_follow_up"] = follow_up + + return state + + +def answer_follow_up_question_node(state: GraphState) -> GraphState: + """Answer follow-up questions about the notice using + BINARY_QUESTION_CHAIN""" + + if state["current_follow_up"]: + question = state["current_follow_up"] + " " + state["notice_message"] + + answer = BINARY_QUESTION_CHAIN.invoke({"question": question}) + + if state.get("follow_ups"): + state["follow_ups"][state["current_follow_up"]] = answer + + else: + state["follow_ups"] = {state["current_follow_up"]: answer} + + return state + + +def route_escalation_status_edge(state: GraphState) -> str: + """Determine whether to send an escalation email or create + a legal ticket""" + + if state["requires_escalation"]: + LOGGER.info("Escalation needed!") + return "send_escalation_email" + + LOGGER.info("No escalation needed") + + return "create_legal_ticket" + + +def route_follow_up_edge(state: GraphState) -> str: + """Determine whether a follow-up question is required""" + + if state.get("current_follow_up"): + return "answer_follow_up_question" + + return END + + +workflow.add_node("parse_notice_message", parse_notice_message_node) +workflow.add_node("check_escalation_status", check_escalation_status_node) +workflow.add_node("send_escalation_email", send_escalation_email_node) +workflow.add_node("create_legal_ticket", create_legal_ticket_node) +workflow.add_node("answer_follow_up_question", answer_follow_up_question_node) + +workflow.add_edge(START, "parse_notice_message") +workflow.add_edge("parse_notice_message", "check_escalation_status") +workflow.add_conditional_edges( + "check_escalation_status", + route_escalation_status_edge, + { + "send_escalation_email": "send_escalation_email", + "create_legal_ticket": "create_legal_ticket", + }, +) +workflow.add_conditional_edges( + "create_legal_ticket", + route_follow_up_edge, + { + "answer_follow_up_question": "answer_follow_up_question", + END: END, + }, +) + +workflow.add_edge("send_escalation_email", "create_legal_ticket") +workflow.add_edge("answer_follow_up_question", "create_legal_ticket") + +NOTICE_EXTRACTION_GRAPH = workflow.compile() diff --git a/python-langgraph/pyproject.toml b/python-langgraph/pyproject.toml new file mode 100644 index 0000000000..ef0231a5a0 --- /dev/null +++ b/python-langgraph/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "python-langgraph" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "langchain-openai>=0.3.2", + "langgraph>=0.2.67", + "pydantic[email]>=2.10.6", +] diff --git a/python-langgraph/requirements.txt b/python-langgraph/requirements.txt new file mode 100644 index 0000000000..22d4d9687f --- /dev/null +++ b/python-langgraph/requirements.txt @@ -0,0 +1,106 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --output-file requirements.txt pyproject.toml +annotated-types==0.7.0 + # via pydantic +anyio==4.8.0 + # via + # httpx + # openai +certifi==2025.1.31 + # via + # httpcore + # httpx + # requests +charset-normalizer==3.4.1 + # via requests +distro==1.9.0 + # via openai +dnspython==2.7.0 + # via email-validator +email-validator==2.2.0 + # via pydantic +h11==0.14.0 + # via httpcore +httpcore==1.0.7 + # via httpx +httpx==0.28.1 + # via + # langgraph-sdk + # langsmith + # openai +idna==3.10 + # via + # anyio + # email-validator + # httpx + # requests +jiter==0.8.2 + # via openai +jsonpatch==1.33 + # via langchain-core +jsonpointer==3.0.0 + # via jsonpatch +langchain-core==0.3.33 + # via + # langchain-openai + # langgraph + # langgraph-checkpoint +langchain-openai==0.3.3 + # via python-langgraph (pyproject.toml) +langgraph==0.2.69 + # via python-langgraph (pyproject.toml) +langgraph-checkpoint==2.0.10 + # via langgraph +langgraph-sdk==0.1.51 + # via langgraph +langsmith==0.3.4 + # via langchain-core +msgpack==1.1.0 + # via langgraph-checkpoint +openai==1.61.0 + # via langchain-openai +orjson==3.10.15 + # via + # langgraph-sdk + # langsmith +packaging==24.2 + # via langchain-core +pydantic==2.10.6 + # via + # python-langgraph (pyproject.toml) + # langchain-core + # langsmith + # openai +pydantic-core==2.27.2 + # via pydantic +pyyaml==6.0.2 + # via langchain-core +regex==2024.11.6 + # via tiktoken +requests==2.32.3 + # via + # langsmith + # requests-toolbelt + # tiktoken +requests-toolbelt==1.0.0 + # via langsmith +sniffio==1.3.1 + # via + # anyio + # openai +tenacity==9.0.0 + # via langchain-core +tiktoken==0.8.0 + # via langchain-openai +tqdm==4.67.1 + # via openai +typing-extensions==4.12.2 + # via + # langchain-core + # openai + # pydantic + # pydantic-core +urllib3==2.3.0 + # via requests +zstandard==0.23.0 + # via langsmith diff --git a/python-langgraph/utils/graph_utils.py b/python-langgraph/utils/graph_utils.py new file mode 100644 index 0000000000..d78d6f6878 --- /dev/null +++ b/python-langgraph/utils/graph_utils.py @@ -0,0 +1,48 @@ +import random +import time + +from chains.notice_extraction import NoticeEmailExtract +from pydantic import EmailStr +from utils.logging_config import LOGGER + + +def send_escalation_email( + notice_email_extract: NoticeEmailExtract, escalation_emails: list[EmailStr] +) -> None: + """Simulate sending escalation emails""" + + LOGGER.info("Sending escalation emails...") + for email in escalation_emails: + time.sleep(1) + LOGGER.info(f"Escalation email sent to {email}") + + +def create_legal_ticket( + current_follow_ups: dict[str, bool] | None, + notice_email_extract: NoticeEmailExtract, +) -> str | None: + """Simulate creating a legal ticket using your company's API.""" + LOGGER.info("Creating legal ticket for notice...") + time.sleep(2) + + follow_ups = [ + None, + """Does this message mention the states of Texas, Georgia, or + New Jersey?""", + "Did this notice involve an issue with FakeAirCo's HVAC system?", + ] + + if current_follow_ups: + follow_ups = [ + f for f in follow_ups if f not in current_follow_ups.keys() + ] + + follow_up = random.choice(follow_ups) + + if not follow_up: + LOGGER.info("Legal ticket created!") + return follow_up + + LOGGER.info("Follow-up is required before creating this ticket") + + return follow_up diff --git a/python-langgraph/utils/logging_config.py b/python-langgraph/utils/logging_config.py new file mode 100644 index 0000000000..2c7ac5a1a0 --- /dev/null +++ b/python-langgraph/utils/logging_config.py @@ -0,0 +1,7 @@ +import logging + +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +LOGGER = logging.getLogger(__name__)