Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions flexus_client_kit/ckit_erp.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ async def erp_table_batch_upsert(
return result


def format_table_meta_text(table_name: str, schema_class: type) -> str:
result = f"Table: erp.{table_name}\n\nColumns:\n"
for field_name, field_type in schema_class.__annotations__.items():
type_str = str(field_type).replace("typing.", "")
meta = schema_class.__dataclass_fields__[field_name].metadata
line = f" • {field_name}: {type_str}"
if meta.get("pkey"):
line += " [PRIMARY KEY]"
if display_name := meta.get("display_name"):
line += f" — {display_name}"
if description := meta.get("description"):
line += f" ({description})"
result += line + "\n"
if examples := meta.get("examples"):
result += f" examples: {examples}\n"
if enum_values := meta.get("enum"):
result += " enum: " + ", ".join(f"{e['value']}" for e in enum_values) + "\n"
return result


def check_record_matches_filters(record: dict, filters, col_names: set = None) -> bool:
if not filters:
return True
Expand Down
24 changes: 24 additions & 0 deletions flexus_client_kit/ckit_integrations_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,30 @@ def _setup_erp(obj, rcx, _tam=tools_and_methods):
integr_need_mongo=True,
))

elif name.startswith("crm"): # "crm[manage_contact, manage_deal, log_activity]"
from flexus_client_kit.integrations import fi_crm
subset = _parse_bracket_list(name)
tool_map = {
"manage_contact": (fi_crm.MANAGE_CRM_CONTACT_TOOL, "handle_manage_crm_contact"),
"manage_deal": (fi_crm.MANAGE_CRM_DEAL_TOOL, "handle_manage_crm_deal"),
"log_activity": (fi_crm.LOG_CRM_ACTIVITY_TOOL, "handle_log_crm_activity"),
}
if subset is None:
subset = list(tool_map.keys())
tools_and_methods = [(tool_map[s][0], tool_map[s][1]) for s in subset]
async def _init_crm(rcx, setup):
return fi_crm.IntegrationCrm(rcx.fclient, rcx.persona.ws_id)
def _setup_crm(obj, rcx, _tam=tools_and_methods):
for tool, method_name in _tam:
rcx.on_tool_call(tool.name)(getattr(obj, method_name))
result.append(IntegrationRecord(
integr_name="crm",
integr_tools=[t for t, _ in tools_and_methods],
integr_init=_init_crm,
integr_setup_handlers=_setup_crm,
integr_prompt=fi_crm.LOG_CRM_ACTIVITIES_PROMPT if "log_activity" in subset else "",
))

else:
raise ValueError(f"Unknown integration {name!r}")
return result
Expand Down
2 changes: 1 addition & 1 deletion flexus_client_kit/erp_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class CrmContact:
ws_id: str
contact_first_name: str = field(metadata={"importance": 1, "display_name": "First Name"})
contact_last_name: str = field(metadata={"importance": 1, "display_name": "Last Name"})
contact_email: str = field(metadata={"importance": 1, "extra_search": True, "display_name": "Email"})
contact_email: Optional[str] = field(default=None, metadata={"importance": 1, "extra_search": True, "display_name": "Email"})
contact_phone: str = field(default="", metadata={"display_name": "Phone"})
contact_id: str = field(default="", metadata={"pkey": True, "display_name": "Contact ID"})
contact_notes: str = field(default="", metadata={"importance": 1, "display": "string_multiline", "display_name": "Notes"})
Expand Down
82 changes: 79 additions & 3 deletions flexus_client_kit/integrations/fi_crm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from typing import Dict, Any, Optional

import gql.transport.exceptions

from flexus_client_kit import ckit_cloudtool, ckit_client, ckit_erp, erp_schema


Expand All @@ -22,7 +24,41 @@
},
)

LOG_CRM_ACTIVITIES_PROMPT = "After each conversation in a messenger platform or outbound message or email, call log_crm_activity with the contact_id, type, direction, and a brief summary. Do this before finishing the task."

LOG_CRM_ACTIVITIES_PROMPT = (
"After the conversation has ended on a messenger platform (not after each message), "
"or after sending an outbound message or email, call log_crm_activity with contact_id, type, direction, and a brief summary. "
"Do this before finishing the task."
)


MANAGE_CRM_CONTACT_TOOL = ckit_cloudtool.CloudTool(
strict=False,
name="manage_crm_contact",
description="Create or patch a CRM contact. Call op='help' to see available fields.",
parameters={
"type": "object",
"properties": {
"op": {"type": "string", "enum": ["help", "create", "patch"], "order": 1},
"args": {"type": "object", "description": "Contact fields; include contact_id for patch", "order": 2},
},
"required": ["op"],
},
)

MANAGE_CRM_DEAL_TOOL = ckit_cloudtool.CloudTool(
strict=False,
name="manage_crm_deal",
description="Create or patch a CRM deal. Call op='help' to see available fields.",
parameters={
"type": "object",
"properties": {
"op": {"type": "string", "enum": ["help", "create", "patch"], "order": 1},
"args": {"type": "object", "description": "Deal fields; include deal_id for patch", "order": 2},
},
"required": ["op"],
},
)


async def find_contact_by_platform_id(fclient, ws_id: str, platform: str, identifier: str) -> Optional[str]:
Expand All @@ -38,6 +74,46 @@ def __init__(self, fclient: ckit_client.FlexusClient, ws_id: str):
self.fclient = fclient
self.ws_id = ws_id

async def handle_manage_crm_contact(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str:
op = args.get("op", "")
if op == "help":
return ckit_erp.format_table_meta_text("crm_contact", erp_schema.CrmContact)
fields = args.get("args", {})
contact_id = str(fields.pop("contact_id", "") or "").strip()
try:
if op == "create":
new_id = await ckit_erp.create_erp_record(self.fclient, "crm_contact", self.ws_id, {"ws_id": self.ws_id, **fields})
return f"✅ Created: {new_id}\n"
elif op == "patch":
if not contact_id:
return "❌ contact_id required for patch\n"
await ckit_erp.patch_erp_record(self.fclient, "crm_contact", self.ws_id, contact_id, fields)
return "✅ Updated\n"
else:
return f"❌ Unknown op: {op}\n"
except gql.transport.exceptions.TransportQueryError as e:
return ckit_cloudtool.gql_error_4xx_to_model_reraise_5xx(e, "fi_crm")

async def handle_manage_crm_deal(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str:
op = args.get("op", "")
if op == "help":
return ckit_erp.format_table_meta_text("crm_deal", erp_schema.CrmDeal)
fields = args.get("args", {})
deal_id = str(fields.pop("deal_id", "") or "").strip()
try:
if op == "create":
new_id = await ckit_erp.create_erp_record(self.fclient, "crm_deal", self.ws_id, {"ws_id": self.ws_id, **fields})
return f"✅ Created: {new_id}\n"
elif op == "patch":
if not deal_id:
return "❌ deal_id required for patch\n"
await ckit_erp.patch_erp_record(self.fclient, "crm_deal", self.ws_id, deal_id, fields)
return "✅ Updated\n"
else:
return f"❌ Unknown op: {op}\n"
except gql.transport.exceptions.TransportQueryError as e:
return ckit_cloudtool.gql_error_4xx_to_model_reraise_5xx(e, "fi_crm")

async def handle_log_crm_activity(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str:
contact_id = args.get("contact_id", "").strip()
activity_type = args.get("activity_type", "").strip()
Expand All @@ -58,5 +134,5 @@ async def handle_log_crm_activity(self, toolcall: ckit_cloudtool.FCloudtoolCall,
"activity_occurred_ts": time.time(),
})
return "✅ Activity logged\n"
except Exception as e:
return f"❌ {e}\n"
except gql.transport.exceptions.TransportQueryError as e:
return ckit_cloudtool.gql_error_4xx_to_model_reraise_5xx(e, "fi_crm")
22 changes: 2 additions & 20 deletions flexus_client_kit/integrations/fi_erp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
name="erp_table_data",
description=(
"Query ERP table data with filtering. "
"Ensure erp_table_meta() has already been called for the table before querying it."
"Operators: =, !=, >, >=, <, <=, LIKE, ILIKE, CIEQL, IN, NOT_IN, IS_NULL, IS_NOT_NULL. "
"LIKE/ILIKE use SQL wildcards: % matches any chars. CIEQL: Case Insensitive Equal. "
"JSON path: details->subtype:=:welcome. "
Expand Down Expand Up @@ -112,25 +113,6 @@
)


def _format_table_meta_text(table_name: str, schema_class: type) -> str:
result = f"Table: erp.{table_name}\n\nColumns:\n"
for field_name, field_type in schema_class.__annotations__.items():
type_str = str(field_type).replace("typing.", "")
meta = schema_class.__dataclass_fields__[field_name].metadata
line = f" • {field_name}: {type_str}"
if meta.get("pkey"):
line += " [PRIMARY KEY]"
if display_name := meta.get("display_name"):
line += f" — {display_name}"
if description := meta.get("description"):
line += f" ({description})"
result += line + "\n"
if examples := meta.get("examples"):
result += f" examples: {examples}\n"
if enum_values := meta.get("enum"):
result += " enum: " + ", ".join(f"{e['value']}" for e in enum_values) + "\n"
return result


def _rows_to_text(rows: list, table_name: str, safety_valve_chars: int = 5000) -> tuple[str, Optional[str]]:
"""
Expand Down Expand Up @@ -253,7 +235,7 @@ async def handle_erp_meta(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: D
schema_class = erp_schema.ERP_TABLE_TO_SCHEMA[tn]
if result_text:
result_text += "\n\n"
result_text += _format_table_meta_text(tn, schema_class)
result_text += ckit_erp.format_table_meta_text(tn, schema_class)

return result_text

Expand Down
Loading