diff --git a/.github/workflows/ai-model-workflow.yml b/.github/workflows/ai-model-workflow.yml new file mode 100644 index 0000000..43a8e2f --- /dev/null +++ b/.github/workflows/ai-model-workflow.yml @@ -0,0 +1,97 @@ + +name: CI/CD for ai-model Service +on: + push: + branches: [ "release/1.0.0" ] + paths: + - 'model-service/**' + + + + + +env: + AWS_REGION: ap-northeast-2 + SERVICE_DIR: model-service + ECR_REPOSITORY: ai-model-service +# ECS_CLUSTER_NAME: DevCluster +# ECS_TASK_DEFINITION_FAMILY: ai-service-td + CONTAINER_NAME: model-service + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + + + +permissions: + id-token: write + contents: read + +jobs: + build-and-deploy: + runs-on: ubuntu-latest + steps: + + - name: Checkout code + uses: actions/checkout@v4 + + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::490913547024:role/gitactionToECR # 기존에 사용하던 역할 ARN + aws-region: ${{ env.AWS_REGION }} + + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + + - name: Set short git commit SHA + id: vars + run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT + + + - name: Build, tag, and push image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + IMAGE_TAG: ${{ steps.vars.outputs.sha_short }} + run: | + docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -f ./${{ env.SERVICE_DIR }}/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG + echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" >> $GITHUB_OUTPUT + + + +# - name: Download task definition +# id: download-task-def +# run: | +# aws ecs describe-task-definition --task-definition ${{ env.ECS_TASK_DEFINITION_FAMILY }} --query taskDefinition > task-definition.json +# echo "file=task-definition.json" >> $GITHUB_OUTPUT +# +# +# - name: Clean task definition for old SDK +# id: clean-task-def +# run: | +# jq 'del(.enableFaultInjection)' ${{ steps.download-task-def.outputs.file }} > cleaned-task-def.json +# echo "file=cleaned-task-def.json" >> $GITHUB_OUTPUT +# +# +# - name: Render Amazon ECS task definition +# id: render-task-def +# uses: aws-actions/amazon-ecs-render-task-definition@v1 +# with: +# task-definition: ${{ steps.clean-task-def.outputs.file }} +# +# container-name: ${{ env.CONTAINER_NAME }} +# image: ${{ steps.build-image.outputs.image }} +# +# +# - name: Deploy Amazon ECS task definition +# uses: aws-actions/amazon-ecs-deploy-task-definition@v1 +# with: +# task-definition: ${{ steps.render-task-def.outputs.task-definition }} +# service: ${{ env.ECR_REPOSITORY }} +# cluster: ${{ env.ECS_CLUSTER_NAME }} +# wait-for-service-stability: true +# diff --git a/.github/workflows/ai-workflow.yml b/.github/workflows/ai-workflow.yml index bf52718..d410695 100644 --- a/.github/workflows/ai-workflow.yml +++ b/.github/workflows/ai-workflow.yml @@ -1,10 +1,10 @@ -name: CI/CD for AI Service - - +name: CI/CD for ai Service on: push: branches: [ "release/1.0.0" ] + paths: + - 'msa-ai-service/**' @@ -14,12 +14,13 @@ env: AWS_REGION: ap-northeast-2 SERVICE_DIR: msa-ai-service ECR_REPOSITORY: ai-service - ECS_CLUSTER_NAME: DevCluster - ECS_TASK_DEFINITION_FAMILY: ai-service-td +# ECS_CLUSTER_NAME: DevCluster +# ECS_TASK_DEFINITION_FAMILY: ai-service-td CONTAINER_NAME: msa-ai-service + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + -# GitHub Actions Runner에 부여할 권한 permissions: id-token: write contents: read @@ -28,70 +29,69 @@ jobs: build-and-deploy: runs-on: ubuntu-latest steps: - # 1. 소스 코드 체크아웃 + - name: Checkout code uses: actions/checkout@v4 - # 2. AWS 자격 증명 설정 (OIDC 역할 사용) + - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 with: role-to-assume: arn:aws:iam::490913547024:role/gitactionToECR # 기존에 사용하던 역할 ARN aws-region: ${{ env.AWS_REGION }} - # 3. Amazon ECR 로그인 + - name: Login to Amazon ECR id: login-ecr uses: aws-actions/amazon-ecr-login@v2 - # 4. 이미지 태그로 사용할 짧은 Git 커밋 해시 생성 + - name: Set short git commit SHA id: vars run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - # 5. Docker 이미지 빌드 및 ECR에 푸시 + - name: Build, tag, and push image to Amazon ECR id: build-image env: ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} IMAGE_TAG: ${{ steps.vars.outputs.sha_short }} run: | - docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -f ./${{ env.SERVICE_DIR }}/Dockerfile ./${{ env.SERVICE_DIR }} + docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -f ./${{ env.SERVICE_DIR }}/Dockerfile . docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" >> $GITHUB_OUTPUT - # 6. 최신 ECS 태스크 정의 파일 다운로드 + - - name: Download task definition - id: download-task-def - run: | - aws ecs describe-task-definition --task-definition ${{ env.ECS_TASK_DEFINITION_FAMILY }} --query taskDefinition > task-definition.json - echo "file=task-definition.json" >> $GITHUB_OUTPUT + # - name: Download task definition + # id: download-task-def + # run: | + # aws ecs describe-task-definition --task-definition ${{ env.ECS_TASK_DEFINITION_FAMILY }} --query taskDefinition > task-definition.json + # echo "file=task-definition.json" >> $GITHUB_OUTPUT - - name: Clean task definition for old SDK - id: clean-task-def - run: | - # jq를 사용해 다운로드한 파일에서 enableFaultInjection 키를 삭제합니다. - jq 'del(.enableFaultInjection)' ${{ steps.download-task-def.outputs.file }} > cleaned-task-def.json - echo "file=cleaned-task-def.json" >> $GITHUB_OUTPUT + # - name: Clean task definition for old SDK + # id: clean-task-def + # run: | + # jq 'del(.enableFaultInjection)' ${{ steps.download-task-def.outputs.file }} > cleaned-task-def.json + # echo "file=cleaned-task-def.json" >> $GITHUB_OUTPUT - - name: Render Amazon ECS task definition - id: render-task-def - uses: aws-actions/amazon-ecs-render-task-definition@v1 - with: - task-definition: ${{ steps.clean-task-def.outputs.file }} + # - name: Render Amazon ECS task definition + # id: render-task-def + # uses: aws-actions/amazon-ecs-render-task-definition@v1 + # with: + # task-definition: ${{ steps.clean-task-def.outputs.file }} - container-name: ${{ env.CONTAINER_NAME }} - image: ${{ steps.build-image.outputs.image }} + # container-name: ${{ env.CONTAINER_NAME }} + # image: ${{ steps.build-image.outputs.image }} - # 8. 새로운 태스크 정의를 ECS 서비스에 배포 - - name: Deploy Amazon ECS task definition - uses: aws-actions/amazon-ecs-deploy-task-definition@v1 - with: - task-definition: ${{ steps.render-task-def.outputs.task-definition }} - service: ${{ env.ECR_REPOSITORY }} - cluster: ${{ env.ECS_CLUSTER_NAME }} - wait-for-service-stability: true + + # - name: Deploy Amazon ECS task definition + # uses: aws-actions/amazon-ecs-deploy-task-definition@v1 + # with: + # task-definition: ${{ steps.render-task-def.outputs.task-definition }} + # service: ${{ env.ECR_REPOSITORY }} + # cluster: ${{ env.ECS_CLUSTER_NAME }} + # wait-for-service-stability: true diff --git a/airflow-service/Dockerfile b/airflow-service/Dockerfile new file mode 100644 index 0000000..e69de29 diff --git a/airflow-service/dags/review_pipeline.py b/airflow-service/dags/review_pipeline.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow-service/requirements.txt b/airflow-service/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..b62bd69 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,44 @@ +version: "3.9" + +services: + redis: + image: redis:7 + ports: + - "6379:6379" + + mongo: + image: mongo:6 + ports: + - "27017:27017" + command: ["mongod", "--replSet", "rs0", "--bind_ip_all"] + volumes: + - mongo_data:/data/db + + model-service: + build: + context: . + dockerfile: model-service/Dockerfile + container_name: model-service + ports: + - "50051:50051" + env_file: + - .env.dev + depends_on: + - mongo + + msa-ai-service: + build: + context: . + dockerfile: msa-ai-service/Dockerfile + container_name: msa-ai-service + ports: + - "8000:8000" + env_file: + - .env.dev + depends_on: + - redis + - mongo + - model-service + +volumes: + mongo_data: diff --git a/infra/ai-task.json b/infra/ai-task.json new file mode 100644 index 0000000..e69de29 diff --git a/infra/airflow-task.json b/infra/airflow-task.json new file mode 100644 index 0000000..e69de29 diff --git a/infra/model-task.json b/infra/model-task.json new file mode 100644 index 0000000..e69de29 diff --git a/model-service/Dockerfile b/model-service/Dockerfile new file mode 100644 index 0000000..8293709 --- /dev/null +++ b/model-service/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.10-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + ca-certificates \ + curl \ + && rm -rf /var/lib/apt/lists/* + +COPY model-service/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# proto 복사 & gRPC 코드 생성 → 루트(/app)에 생성 +COPY proto ./proto +RUN python -m grpc_tools.protoc \ + -I./proto \ + --python_out=. \ + --grpc_python_out=. \ + ./proto/model.proto + +# 서비스 코드 복사 +COPY model-service/app ./app + +# PYTHONPATH 설정 +ENV PYTHONPATH=/app + +# gRPC 서버 실행 +CMD ["python", "-m", "app.main"] + \ No newline at end of file diff --git a/model-service/app/__init__.py b/model-service/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/model-service/app/core/config.py b/model-service/app/core/config.py new file mode 100644 index 0000000..c1f4294 --- /dev/null +++ b/model-service/app/core/config.py @@ -0,0 +1,34 @@ +import os +from dotenv import load_dotenv + +# ENV 구분 +ENV = os.getenv("ENV", "dev") + +if ENV == "prod": + load_dotenv(".env.prod") +else: + load_dotenv(".env.dev") + +# Mongo +MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") +MONGODB_NAME = os.getenv("MONGODB_NAME", "ai_service_dev") + +# Redis +REDIS_HOST = os.getenv("REDIS_HOST", "redis") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) +REDIS_REQUEST_STREAM = os.getenv("REDIS_REQUEST_STREAM", "chat:requests") +REDIS_RESPONSE_STREAM = os.getenv("REDIS_RESPONSE_STREAM", "chat:responses") + +# OpenAI +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") + +# gRPC +MODEL_SERVICE_HOST = os.getenv("MODEL_SERVICE_HOST", "model-service") +MODEL_SERVICE_PORT = int(os.getenv("MODEL_SERVICE_PORT", 50051)) + +# Vector Search (prod용) +VECTOR_INDEX_NAME = os.getenv("VECTOR_INDEX_NAME", "reviews_embedding_index") + +# 라벨 정의 +REVIEW_LABELS = ["quantity", "size", "sweet", "salty", "spicy", "deep", "sour"] +POLARITY_LABELS = ["POSITIVE", "NEGATIVE"] diff --git a/model-service/app/db/__init__.py b/model-service/app/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/model-service/app/db/mongodb.py b/model-service/app/db/mongodb.py new file mode 100644 index 0000000..025d586 --- /dev/null +++ b/model-service/app/db/mongodb.py @@ -0,0 +1,8 @@ +from pymongo import MongoClient +from app.core.config import MONGODB_URI, MONGODB_NAME + +client = MongoClient(MONGODB_URI) +db = client[MONGODB_NAME] + +def get_collection(name: str): + return db[name] # 지정된 콜렉션 반환 diff --git a/model-service/app/main.py b/model-service/app/main.py new file mode 100644 index 0000000..e09f0bc --- /dev/null +++ b/model-service/app/main.py @@ -0,0 +1,58 @@ +import grpc +from concurrent import futures +import model_pb2, model_pb2_grpc +from app.ml.embedding_model import embedding_model +from app.db.mongodb import get_collection +from app.services.labeling_service import LabelingService + +queries_embedding_col = get_collection("queries_embedding") +reviews_embedding_col = get_collection("reviews_embedding") + +labeling_service = LabelingService() + +class ModelService(model_pb2_grpc.ModelServiceServicer): + def GetEmbedding(self, request, context): + text = request.text + meta = dict(request.meta) + + # 1. KoSimCSE 임베딩 + embedding = embedding_model.encode([text])[0] + + # 2. OpenAI 라벨링 + mode = "review" if meta.get("type") == "review" else "question" + label, polarity = labeling_service.embed_and_label(text, mode) + + # 3. MongoDB 저장 + if meta.get("type") == "query": + queries_embedding_col.insert_one({ + "_id": meta["query_id"], + "store_id": meta["store_id"], + "menu_id": meta["menu_id"], + "query": text, + "embedding": embedding, + "label": label, + "polarity": polarity + }) + else: + reviews_embedding_col.insert_one({ + "_id": meta["review_id"], + "store_id": meta["store_id"], + "menu_id": meta["menu_id"], + "review": text, + "embedding": embedding, + "label": label, + "polarity": polarity + }) + + return model_pb2.EmbeddingResponse(status="ok") + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + model_pb2_grpc.add_ModelServiceServicer_to_server(ModelService(), server) + server.add_insecure_port("[::]:50051") + server.start() + print("🚀 model-service gRPC server started at 50051") + server.wait_for_termination() + +if __name__ == "__main__": + serve() diff --git a/model-service/app/ml/embedding_model.py b/model-service/app/ml/embedding_model.py new file mode 100644 index 0000000..0282a8c --- /dev/null +++ b/model-service/app/ml/embedding_model.py @@ -0,0 +1,18 @@ +# Huggingface로 모델 로드하기 + +from sentence_transformers import SentenceTransformer +from typing import List +import os + +class LocalEmbeddingModel: + def __init__(self, model_name: str = None): + # 환경변수에서 모델 경로/이름 불러오기 (없으면 기본값) + model_name = model_name or os.getenv("EMBEDDING_MODEL", "BM-K/KoSimCSE-roberta") + self.model = SentenceTransformer(model_name) + + def encode(self, texts: List[str]) -> List[List[float]]: + """여러 문장을 벡터로 변환""" + return self.model.encode(texts, convert_to_numpy=True).tolist() + +# 싱글톤 객체 생성 (앱 전체에서 공유) +embedding_model = LocalEmbeddingModel() diff --git a/model-service/app/model_pb2.py b/model-service/app/model_pb2.py new file mode 100644 index 0000000..bead0a6 --- /dev/null +++ b/model-service/app/model_pb2.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: model.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'model.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bmodel.proto\x12\x05model\"~\n\x10\x45mbeddingRequest\x12\x0c\n\x04text\x18\x01 \x01(\t\x12/\n\x04meta\x18\x02 \x03(\x0b\x32!.model.EmbeddingRequest.MetaEntry\x1a+\n\tMetaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"#\n\x11\x45mbeddingResponse\x12\x0e\n\x06status\x18\x01 \x01(\t2Q\n\x0cModelService\x12\x41\n\x0cGetEmbedding\x12\x17.model.EmbeddingRequest\x1a\x18.model.EmbeddingResponseb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'model_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_EMBEDDINGREQUEST_METAENTRY']._loaded_options = None + _globals['_EMBEDDINGREQUEST_METAENTRY']._serialized_options = b'8\001' + _globals['_EMBEDDINGREQUEST']._serialized_start=22 + _globals['_EMBEDDINGREQUEST']._serialized_end=148 + _globals['_EMBEDDINGREQUEST_METAENTRY']._serialized_start=105 + _globals['_EMBEDDINGREQUEST_METAENTRY']._serialized_end=148 + _globals['_EMBEDDINGRESPONSE']._serialized_start=150 + _globals['_EMBEDDINGRESPONSE']._serialized_end=185 + _globals['_MODELSERVICE']._serialized_start=187 + _globals['_MODELSERVICE']._serialized_end=268 +# @@protoc_insertion_point(module_scope) diff --git a/model-service/app/model_pb2_grpc.py b/model-service/app/model_pb2_grpc.py new file mode 100644 index 0000000..780ade7 --- /dev/null +++ b/model-service/app/model_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import model_pb2 as model__pb2 + +GRPC_GENERATED_VERSION = '1.74.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in model_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class ModelServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.GetEmbedding = channel.unary_unary( + '/model.ModelService/GetEmbedding', + request_serializer=model__pb2.EmbeddingRequest.SerializeToString, + response_deserializer=model__pb2.EmbeddingResponse.FromString, + _registered_method=True) + + +class ModelServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def GetEmbedding(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ModelServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'GetEmbedding': grpc.unary_unary_rpc_method_handler( + servicer.GetEmbedding, + request_deserializer=model__pb2.EmbeddingRequest.FromString, + response_serializer=model__pb2.EmbeddingResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'model.ModelService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('model.ModelService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class ModelService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def GetEmbedding(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/model.ModelService/GetEmbedding', + model__pb2.EmbeddingRequest.SerializeToString, + model__pb2.EmbeddingResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/model-service/app/services/labeling_service.py b/model-service/app/services/labeling_service.py new file mode 100644 index 0000000..f721abf --- /dev/null +++ b/model-service/app/services/labeling_service.py @@ -0,0 +1,28 @@ +import json +import openai +from app.core.config import REVIEW_LABELS, POLARITY_LABELS, OPENAI_API_KEY + +openai.api_key = OPENAI_API_KEY + +class LabelingService: + def embed_and_label(self, text: str, mode: str): + """ + 텍스트를 KoSimCSE 임베딩 + OpenAI 라벨링으로 변환 + """ + target_word = "리뷰" if mode == "review" else "질문" + + prompt = f""" + 너는 리뷰 분석기야. {target_word} 문장을 보고 아래 후보 중 라벨과 polarity를 정해줘. + 라벨 후보: {", ".join(REVIEW_LABELS)} + 폴라리티 후보: {", ".join(POLARITY_LABELS)} + 출력 형식: JSON {{ "label": "...", "polarity": "..." }} + {target_word}: {text} + """ + + resp = openai.chat.completions.create( + model="gpt-4.1-mini", + messages=[{"role": "user", "content": prompt}], + response_format={"type": "json_object"} + ) + parsed = json.loads(resp.choices[0].message.content) + return parsed["label"], parsed["polarity"] diff --git a/model-service/requirements.txt b/model-service/requirements.txt new file mode 100644 index 0000000..1cf9b26 --- /dev/null +++ b/model-service/requirements.txt @@ -0,0 +1,8 @@ +grpcio +grpcio-tools +pymongo +python-dotenv +sentence-transformers +openai +torch +transformers \ No newline at end of file diff --git a/msa-ai-service/Dockerfile b/msa-ai-service/Dockerfile index 4b8f6a5..c13e256 100644 --- a/msa-ai-service/Dockerfile +++ b/msa-ai-service/Dockerfile @@ -2,18 +2,68 @@ FROM python:3.10-slim WORKDIR /app -# SSL 인증서와 필요한 툴 설치 +# 시스템 기본 툴 설치 RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + python3-dev \ + python3-pip \ ca-certificates \ curl \ && rm -rf /var/lib/apt/lists/* - -COPY requirements.txt . + +# requirements 설치 +COPY msa-ai-service/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -# RUN python -c "from sentence_transformers import SentenceTransformer; \ -# SentenceTransformer('BM-K/KoSimCSE-roberta').save('./model')" +# grpcio-tools 설치 (proto 컴파일용) +RUN pip install grpcio grpcio-tools + +# proto 복사 & 컴파일 (생성 결과는 /app 루트에 위치) +COPY proto ./proto +RUN python -m grpc_tools.protoc \ + -I./proto \ + --python_out=. \ + --grpc_python_out=. \ + ./proto/model.proto + +# 서비스 코드 복사 +COPY msa-ai-service/app ./app + +# PYTHONPATH 잡아주기 +ENV PYTHONPATH=/app + +# 기본 실행 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] + + +# FROM python:3.10-slim + +# WORKDIR /app + +# # SSL 인증서와 필요한 툴 설치 +# RUN apt-get update && apt-get install -y --no-install-recommends \ +# ca-certificates \ +# curl \ +# python3-dev \ +# && rm -rf /var/lib/apt/lists/* + +# COPY requirements.txt . +# RUN pip install --no-cache-dir -r requirements.txt +# # RUN python -c "from sentence_transformers import SentenceTransformer; \ +# # SentenceTransformer('BM-K/KoSimCSE-roberta').save('./model')" + + +# # 서비스 코드 복사 +# COPY app ./app + +# # proto 복사 (루트에서 proto만 가져옴) +# COPY ../proto ./proto -COPY . . +# # proto 파일 컴파일 (model.proto → model_pb2.py / model_pb2_grpc.py) +# RUN python -m grpc_tools.protoc \ +# -I. \ +# --python_out=. \ +# --grpc_python_out=. \ +# app/protos/model.proto -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +# CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/msa-ai-service/app/core/config.py b/msa-ai-service/app/core/config.py index 6101958..2efaa2e 100644 --- a/msa-ai-service/app/core/config.py +++ b/msa-ai-service/app/core/config.py @@ -11,8 +11,19 @@ MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017") -MONGODB_NAME = os.getenv("MONGODB_NAME", "ai_service_db") +MONGODB_NAME = os.getenv("MONGODB_NAME", "ai_service_dev") + +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) +REDIS_REQUEST_STREAM = os.getenv("REDIS_REQUEST_STREAM", "chat:requests") +REDIS_RESPONSE_STREAM = os.getenv("REDIS_RESPONSE_STREAM", "chat:responses") + OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") REVIEW_LABELS = ["quantity", "size", "sweet", "salty", "spicy", "deep", "sour"] -POLARITY_LABELS = ["POSITIVE", "NEGATIVE"] \ No newline at end of file +POLARITY_LABELS = ["POSITIVE", "NEGATIVE"] + +MODEL_SERVICE_HOST = os.getenv("MODEL_SERVICE_HOST", "model-service") +MODEL_SERVICE_PORT = int(os.getenv("MODEL_SERVICE_PORT", 50051)) + +VECTOR_INDEX_NAME = os.getenv("VECTOR_INDEX_NAME", "reviews_embedding_index") \ No newline at end of file diff --git a/msa-ai-service/app/main.py b/msa-ai-service/app/main.py index a9d773b..d4abfcf 100644 --- a/msa-ai-service/app/main.py +++ b/msa-ai-service/app/main.py @@ -1,13 +1,14 @@ -# (Spring + Mongo + Change Stream 자동 실행) - +# 임시테스트요오오 from fastapi import FastAPI from contextlib import asynccontextmanager -from app.routes import health, qa_router -from app.services.change_stream_service import start_watchers -from app.core.config import ENV -if ENV == "dev": from app.routes import seed_router import logging import sys +import asyncio +from app.routes import health +from app.core.config import ENV +from app.services.redis_service import start_redis_consumer +from app.services.review_watcher import watch_reviews + logging.basicConfig( level=logging.INFO, @@ -20,16 +21,62 @@ @asynccontextmanager async def lifespan(app: FastAPI): - # Mongo Change Stream 시작 - start_watchers() + # Redis Consumer 시작 + start_redis_consumer() + + # Mongo Change Stream Watcher 시작 + asyncio.create_task(watch_reviews()) + yield + app = FastAPI(title="MSA AI Service", lifespan=lifespan) # 공통 라우터 app.include_router(health.router, prefix="/ai", tags=["health"]) -app.include_router(qa_router.router, prefix="/ai", tags=["qa"]) # dev 환경일 때만 seed_router 등록 if ENV == "dev": + from app.routes import seed_router app.include_router(seed_router.router, prefix="/ai", tags=["seed"]) + + +# # (Spring + Mongo + Redis Stream 자동 실행) + +# from fastapi import FastAPI +# from contextlib import asynccontextmanager +# from app.routes import health, qa_router +# from app.core.config import ENV +# if ENV == "dev": from app.routes import seed_router +# import logging +# import sys +# from app.services.redis_service import start_redis_consumer +# from app.services.redis_service import read_request, add_response +# import asyncio + + +# logging.basicConfig( +# level=logging.INFO, +# format="%(asctime)s %(levelname)-5s %(name)s - %(message)s", +# datefmt="%Y-%m-%d %H:%M:%S", +# stream=sys.stdout, +# ) +# logger = logging.getLogger(__name__) + + +# @asynccontextmanager +# async def lifespan(app: FastAPI): + +# # Redis Consumer 시작 +# start_redis_consumer() +# yield + +# app = FastAPI(title="MSA AI Service", lifespan=lifespan) + +# # 공통 라우터 +# app.include_router(health.router, prefix="/ai", tags=["health"]) +# app.include_router(qa_router.router, prefix="/ai", tags=["qa"]) + +# # dev 환경일 때만 seed_router 등록 +# if ENV == "dev": +# app.include_router(seed_router.router, prefix="/ai", tags=["seed"]) diff --git a/msa-ai-service/app/ml/embedding_model.py b/msa-ai-service/app/ml/embedding_model.py deleted file mode 100644 index 661dc97..0000000 --- a/msa-ai-service/app/ml/embedding_model.py +++ /dev/null @@ -1 +0,0 @@ -# Huggingface로 모델 로드하기 \ No newline at end of file diff --git a/msa-ai-service/app/model_pb2.py b/msa-ai-service/app/model_pb2.py new file mode 100644 index 0000000..bead0a6 --- /dev/null +++ b/msa-ai-service/app/model_pb2.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: model.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'model.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bmodel.proto\x12\x05model\"~\n\x10\x45mbeddingRequest\x12\x0c\n\x04text\x18\x01 \x01(\t\x12/\n\x04meta\x18\x02 \x03(\x0b\x32!.model.EmbeddingRequest.MetaEntry\x1a+\n\tMetaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"#\n\x11\x45mbeddingResponse\x12\x0e\n\x06status\x18\x01 \x01(\t2Q\n\x0cModelService\x12\x41\n\x0cGetEmbedding\x12\x17.model.EmbeddingRequest\x1a\x18.model.EmbeddingResponseb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'model_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_EMBEDDINGREQUEST_METAENTRY']._loaded_options = None + _globals['_EMBEDDINGREQUEST_METAENTRY']._serialized_options = b'8\001' + _globals['_EMBEDDINGREQUEST']._serialized_start=22 + _globals['_EMBEDDINGREQUEST']._serialized_end=148 + _globals['_EMBEDDINGREQUEST_METAENTRY']._serialized_start=105 + _globals['_EMBEDDINGREQUEST_METAENTRY']._serialized_end=148 + _globals['_EMBEDDINGRESPONSE']._serialized_start=150 + _globals['_EMBEDDINGRESPONSE']._serialized_end=185 + _globals['_MODELSERVICE']._serialized_start=187 + _globals['_MODELSERVICE']._serialized_end=268 +# @@protoc_insertion_point(module_scope) diff --git a/msa-ai-service/app/model_pb2_grpc.py b/msa-ai-service/app/model_pb2_grpc.py new file mode 100644 index 0000000..780ade7 --- /dev/null +++ b/msa-ai-service/app/model_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import model_pb2 as model__pb2 + +GRPC_GENERATED_VERSION = '1.74.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in model_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class ModelServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.GetEmbedding = channel.unary_unary( + '/model.ModelService/GetEmbedding', + request_serializer=model__pb2.EmbeddingRequest.SerializeToString, + response_deserializer=model__pb2.EmbeddingResponse.FromString, + _registered_method=True) + + +class ModelServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def GetEmbedding(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ModelServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'GetEmbedding': grpc.unary_unary_rpc_method_handler( + servicer.GetEmbedding, + request_deserializer=model__pb2.EmbeddingRequest.FromString, + response_serializer=model__pb2.EmbeddingResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'model.ModelService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('model.ModelService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class ModelService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def GetEmbedding(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/model.ModelService/GetEmbedding', + model__pb2.EmbeddingRequest.SerializeToString, + model__pb2.EmbeddingResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/msa-ai-service/app/models/qa.py b/msa-ai-service/app/models/qa.py index 6cc8b8a..9be8dc6 100644 --- a/msa-ai-service/app/models/qa.py +++ b/msa-ai-service/app/models/qa.py @@ -1,21 +1,42 @@ -# Pydantic 스키마 (qa_queries, qa_answers) +# Pydantic 스키마 (queries, answers) # Swagger 문서화/타입 검증용 샘플 -from pydantic import BaseModel +from pydantic import BaseModel, Field +from typing import List, Optional from datetime import datetime -class QAQuery(BaseModel): - request_id: str + +class QueryItem(BaseModel): + query_id: str + query: str + + +class MenuQueries(BaseModel): menu_id: str - question: str + queries: List[QueryItem] = [] + + +class QueryDocument(BaseModel): + id: str = Field(..., alias="_id") # store_id + menus: List[MenuQueries] = [] + updated_at: datetime + -class QAAnswer(BaseModel): - request_id: str +class AnswerDocument(BaseModel): + id: str = Field(..., alias="_id") # question_id store_id: str store_name: str menu_id: str menu_name: str answer: str label: str - polarity: str + created_at: datetime + + +class QueryEmbeddingDocument(BaseModel): + id: str = Field(..., alias="_id") # request_id + menu_id: str + query: str + label: str + embedding: List[float] created_at: datetime diff --git a/msa-ai-service/app/models/review.py b/msa-ai-service/app/models/review.py index 2b27305..9945d74 100644 --- a/msa-ai-service/app/models/review.py +++ b/msa-ai-service/app/models/review.py @@ -1,21 +1,35 @@ # Pydantic 스키마 (reviews, reviews_embedding) -from pydantic import BaseModel -from datetime import datetime +from pydantic import BaseModel, Field from typing import List +from datetime import datetime + -class Review(BaseModel): +class ReviewItem(BaseModel): review_id: str text: str created_at: datetime -class Menu(BaseModel): + +class MenuReviews(BaseModel): menu_id: str menu_name: str - reviews: List[Review] = [] + reviews: List[ReviewItem] = [] + -class StoreReview(BaseModel): - _id: str +class ReviewDocument(BaseModel): + id: str = Field(..., alias="_id") # store_id store_name: str - menus: List[Menu] + menus: List[MenuReviews] = [] + updated_at: datetime + + +class ReviewEmbeddingDocument(BaseModel): + id: str = Field(..., alias="_id") # review_id + store_id: str + menu_id: str + review: str + label: str + polarity: str + embedding: List[float] updated_at: datetime diff --git a/msa-ai-service/app/routes/qa_router.py b/msa-ai-service/app/routes/qa_router.py index 986b1fc..bb54d78 100644 --- a/msa-ai-service/app/routes/qa_router.py +++ b/msa-ai-service/app/routes/qa_router.py @@ -2,14 +2,14 @@ # API 트리거 질문 -> 응답 처리 가능 """ -1. (Change Stream 또는 직접 호출 시) qa_queries 문서 전체 읽기 +1. (Change Stream 또는 직접 호출 시) queries 문서 전체 읽기 2. 각 질문(request_id) 확인: queries_embedding에 없으면 → 새 질문 처리 시작 3. queries_embedding 생성 (label, polarity 포함) -4. qa_answers에 같은 store_id + menu_id + label + polarity 답변이 있으면: - qa_answers.created_at < reviews_denorm.updated_at → 새로 생성 +4. answers에 같은 store_id + menu_id + label + polarity 답변이 있으면: + answers.created_at < reviews.updated_at → 새로 생성 아니면 재사용 -5. 최종 답변 qa_answers에 저장 +5. 최종 답변 answers에 저장 """ from fastapi import APIRouter @@ -20,7 +20,7 @@ router = APIRouter() -qa_queries_col = get_collection("qa_queries") +queries_col = get_collection("queries") queries_embedding_col = get_collection("queries_embedding") @router.get("/process-queries") @@ -29,7 +29,7 @@ async def process_queries(limit: int = 10): 수동으로 QA 파이프라인 실행 (Change Stream 대신 직접 확인할 때 사용) """ results = [] - docs = qa_queries_col.find().limit(limit) + docs = queries_col.find().limit(limit) for doc in docs: store_id = doc["_id"] diff --git a/msa-ai-service/app/routes/seed_router.py b/msa-ai-service/app/routes/seed_router.py index 2e16ae4..421caab 100644 --- a/msa-ai-service/app/routes/seed_router.py +++ b/msa-ai-service/app/routes/seed_router.py @@ -8,8 +8,8 @@ router = APIRouter() -qa_queries_col = get_collection("qa_queries") -reviews_denorm_col = get_collection("reviews_denorm") +queries_col = get_collection("queries") +reviews_col = get_collection("reviews") # 고정 UUID 생성 (매번 실행해도 동일한 ID 유지) STORE_IDS = { @@ -93,8 +93,8 @@ async def init_dummy_data(): for q in store["questions"] ] - # qa_queries 전체 교체 (replace_one) - qa_queries_col.replace_one( + # queries 전체 교체 (replace_one) + queries_col.replace_one( {"_id": store_id}, { "_id": store_id, @@ -121,8 +121,8 @@ async def init_dummy_data(): for r in store["reviews"] ] - # reviews_denorm 전체 교체 (replace_one) - reviews_denorm_col.replace_one( + # reviews 전체 교체 (replace_one) + reviews_col.replace_one( {"_id": store_id}, { "_id": store_id, diff --git a/msa-ai-service/app/services/change_stream_service.py b/msa-ai-service/app/services/change_stream_service.py deleted file mode 100644 index 98eb24f..0000000 --- a/msa-ai-service/app/services/change_stream_service.py +++ /dev/null @@ -1,268 +0,0 @@ -# MongoDB 이벤트 감시 후 자동 처리 트리거 -""" -MongoDB Change Stream 이벤트 감시 서비스 -- qa_queries 변경 시: 새로운 질문 → queries_embedding 업데이트 + process_query 호출 -- reviews_denorm 변경 시: 새로운 리뷰 → reviews_embedding 업데이트 -""" -# app/services/change_stream_service.py -import threading -from datetime import datetime -from app.db.mongodb import get_collection -from app.services.embedding_service import embed_and_label_question, embed_and_label_review -from app.services.rag_service import process_query - -qa_queries_col = get_collection("qa_queries") -reviews_denorm_col = get_collection("reviews_denorm") -queries_embedding_col = get_collection("queries_embedding") -reviews_embedding_col = get_collection("reviews_embedding") - -# 새 질문 처리 -def process_new_questions(change): - print("🟢 New question change detected:", change) - full_doc = change["fullDocument"] - - for menu in full_doc.get("menus", []): - for q in menu.get("questions", []): - # 이미 처리된 request_id 건너뛰기 ✅ (기존에도 있었음, 유지) - queries_doc = queries_embedding_col.find_one({"_id": full_doc["_id"]}) - existing_ids = [] - if queries_doc: - for m in queries_doc["menus"]: - if m["menu_id"] == menu["menu_id"]: - existing_ids = [qe["request_id"] for qe in m.get("questions_embedding", [])] - if q["request_id"] in existing_ids: - continue - - # 라벨링 + 임베딩 - label, polarity, embedding = embed_and_label_question(q["question"]) - - # 메뉴 없으면 생성 - queries_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": menu["menu_id"]}, - { - "$setOnInsert": { - "_id": full_doc["_id"], - "store_name": full_doc["store_name"], - "menus": [ - { - "menu_id": menu["menu_id"], - "menu_name": menu["menu_name"], - "questions_embedding": [] - } - ] - } - }, - upsert=True - ) - - # 질문 추가 - queries_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": menu["menu_id"]}, - { - "$push": { - "menus.$.questions_embedding": { - "request_id": q["request_id"], - "question": q["question"], - "label": label, - "polarity": polarity, - "embedding": embedding, - "created_at": datetime.utcnow() - } - }, - "$set": {"updated_at": datetime.utcnow(), "store_name": full_doc["store_name"]} - } - ) - - # RAG 실행 (qa_answers 생성까지) - query_emb = { - "request_id": q["request_id"], - "question": q["question"], - "label": label, - "polarity": polarity, - "embedding": embedding - } - process_query(full_doc, menu, query_emb) - - -# 새 리뷰 처리 -def process_new_reviews(change): - print("🟢 New review change detected:", change) - full_doc = change["fullDocument"] - - for menu in full_doc.get("menus", []): - for r in menu.get("reviews", []): - reviews_doc = reviews_embedding_col.find_one({"_id": full_doc["_id"]}) - existing_ids = [] - if reviews_doc: - for m in reviews_doc["menus"]: - if m["menu_id"] == menu["menu_id"]: - existing_ids = [re["review_id"] for re in m.get("reviews_embedding", [])] - if r["review_id"] in existing_ids: - continue # ✅ 리뷰 단위 비교 (updated_at 신뢰 안 함) - - # 라벨링 + 임베딩 - label, polarity, embedding = embed_and_label_review(r["text"]) - - # 메뉴 없으면 생성 - reviews_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": menu["menu_id"]}, - { - "$setOnInsert": { - "_id": full_doc["_id"], - "store_name": full_doc["store_name"], - "menus": [ - { - "menu_id": menu["menu_id"], - "menu_name": menu["menu_name"], - "reviews_embedding": [] - } - ] - } - }, - upsert=True - ) - - # 리뷰 추가 - reviews_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": menu["menu_id"]}, - { - "$push": { - "menus.$.reviews_embedding": { - "review_id": r["review_id"], - "text": r["text"], - "label": label, - "polarity": polarity, - "embedding": embedding, - "updated_at": datetime.utcnow() - } - }, - "$set": {"updated_at": datetime.utcnow(), "store_name": full_doc["store_name"]} - } - ) - - -# ✅ 추가: 서버 시작 시 bootstrap 함수들 -def bootstrap_unanswered_questions(): - print("🚀 Bootstrap unanswered questions 실행") - for full_doc in qa_queries_col.find({}): - for menu in full_doc.get("menus", []): - for q in menu.get("questions", []): - queries_doc = queries_embedding_col.find_one({"_id": full_doc["_id"]}) - existing_ids = [] - if queries_doc: - for m in queries_doc.get("menus", []): - if m["menu_id"] == menu["menu_id"]: - existing_ids = [qe["request_id"] for qe in m.get("questions_embedding", [])] - if q["request_id"] in existing_ids: - continue - - label, polarity, embedding = embed_and_label_question(q["question"]) - - queries_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": menu["menu_id"]}, - {"$push": {"menus.$.questions_embedding": { - "request_id": q["request_id"], - "question": q["question"], - "label": label, - "polarity": polarity, - "embedding": embedding, - "created_at": datetime.utcnow() - }}}, - upsert=True - ) - - query_emb = { - "request_id": q["request_id"], - "question": q["question"], - "label": label, - "polarity": polarity, - "embedding": embedding - } - process_query(full_doc, menu, query_emb) - print("✅ Bootstrap unanswered questions 완료") - - -def bootstrap_reviews_embedding(): - print("🚀 Bootstrap reviews embedding 실행") - for full_doc in reviews_denorm_col.find({}): - for menu in full_doc.get("menus", []): - for r in menu.get("reviews", []): - reviews_doc = reviews_embedding_col.find_one({"_id": full_doc["_id"]}) - existing_ids = [] - if reviews_doc: - for m in reviews_doc.get("menus", []): - if m["menu_id"] == menu["menu_id"]: - existing_ids = [re["review_id"] for re in m.get("reviews_embedding", [])] - if r["review_id"] in existing_ids: - continue - - # 라벨링 + 임베딩 - label, polarity, embedding = embed_and_label_review(r["text"]) - - # 1. store 문서 없으면 생성 - reviews_embedding_col.update_one( - {"_id": full_doc["_id"]}, - { - "$setOnInsert": { - "_id": full_doc["_id"], - "store_name": full_doc["store_name"], - "menus": [] - } - }, - upsert=True - ) - - # ✅ 2. 해당 menu_id 없으면 메뉴 추가 - reviews_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": {"$ne": menu["menu_id"]}}, - { - "$push": { - "menus": { - "menu_id": menu["menu_id"], - "menu_name": menu["menu_name"], - "reviews_embedding": [] - } - } - } - ) - - # 3. 이제 안전하게 리뷰 추가 - reviews_embedding_col.update_one( - {"_id": full_doc["_id"], "menus.menu_id": menu["menu_id"]}, - { - "$push": { - "menus.$.reviews_embedding": { - "review_id": r["review_id"], - "text": r["text"], - "label": label, - "polarity": polarity, - "embedding": embedding, - "updated_at": datetime.utcnow() - } - }, - "$set": {"updated_at": datetime.utcnow(), "store_name": full_doc["store_name"]} - } - ) - print("✅ Bootstrap reviews embedding 완료") - - - -# Change Stream 워처 -def watch_queries(): - with qa_queries_col.watch(full_document="updateLookup") as stream: - for change in stream: - if change["operationType"] in ("insert", "replace", "update"): - process_new_questions(change) - -def watch_reviews(): - with reviews_denorm_col.watch(full_document="updateLookup") as stream: - for change in stream: - if change["operationType"] in ("insert", "replace", "update"): - process_new_reviews(change) - -def start_watchers(): - bootstrap_unanswered_questions() - bootstrap_reviews_embedding() - - threading.Thread(target=watch_queries, daemon=True).start() - threading.Thread(target=watch_reviews, daemon=True).start() diff --git a/msa-ai-service/app/services/embedding_service.py b/msa-ai-service/app/services/embedding_service.py index f3f61c1..0523db1 100644 --- a/msa-ai-service/app/services/embedding_service.py +++ b/msa-ai-service/app/services/embedding_service.py @@ -1,83 +1,144 @@ -# 질문/리뷰를 임베딩으로 변환 - -""" -질문 / 리뷰 임베딩 + 라벨링 서비스 -- OpenAI GPT를 사용하여 label / polarity 분류 -- OpenAI Embedding API를 사용하여 벡터 생성 -- 결과를 MongoDB queries_embedding / reviews_embedding 에 반영 -""" - -from datetime import datetime -from openai import OpenAI -import json -from app.db.mongodb import get_collection -from app.core.config import OPENAI_API_KEY, REVIEW_LABELS, POLARITY_LABELS - -# OpenAI 클라이언트 초기화 -client = OpenAI(api_key=OPENAI_API_KEY) - -# MongoDB 컬렉션 핸들 -qa_queries_col = get_collection("qa_queries") -reviews_denorm_col = get_collection("reviews_denorm") -queries_embedding_col = get_collection("queries_embedding") -reviews_embedding_col = get_collection("reviews_embedding") +import grpc +import model_pb2, model_pb2_grpc # gRPC proto에서 생성된 코드 + +class EmbeddingService: + def __init__(self, host="model-service", port=50051): + channel = grpc.insecure_channel(f"{host}:{port}") + self.stub = model_pb2_grpc.ModelServiceStub(channel) + + def embed_and_label(self, text, meta): + request = model_pb2.EmbeddingRequest(text=text, meta=meta) + response = self.stub.GetEmbedding(request) + return response.status + + + + +############# 수정 단계 코드 ############# +# # 질문/리뷰에서 임베딩 + 라벨링 추출 +# # 임베딩: kosimCSE +# # 라벨링: OpenAI API + +# """ +# 질문 / 리뷰 임베딩 + 라벨링 서비스 +# - OpenAI GPT를 사용하여 label / polarity 분류 +# - OpenAI Embedding API를 사용하여 벡터 생성 +# - 결과를 MongoDB queries_embedding / reviews_embedding 에 반영 +# """ + +# from openai import OpenAI +# import json +# from app.core.config import OPENAI_API_KEY, REVIEW_LABELS, POLARITY_LABELS +# from app.ml.embedding_model import embedding_model +# import requests +# from app.core.config import MODEL_SERVICE_URL + +# # OpenAI 클라이언트 초기화 +# client = OpenAI(api_key=OPENAI_API_KEY) + +# # # MongoDB 컬렉션 핸들 +# # queries_col = get_collection("queries") +# # reviews_col = get_collection("reviews") +# # queries_embedding_col = get_collection("queries_embedding") +# # reviews_embedding_col = get_collection("reviews_embedding") + + +# def get_embedding(text: str): +# response = requests.post(MODEL_SERVICE_URL, json={"text": text}) +# response.raise_for_status() +# return response.json()["embedding"] + + +# def embed_and_label(text: str, mode: str): +# """ +# 텍스트(리뷰/질문)를 라벨링 + 임베딩으로 변환 +# mode: "review" 또는 "question" +# """ +# target_word = "리뷰" if mode == "review" else "질문" +# prompt = f""" +# 너는 리뷰 분석기야. {target_word} 문장을 보고 아래 후보 중 라벨과 polarity를 정해줘. +# 라벨 후보: {", ".join(REVIEW_LABELS)} +# 폴라리티 후보: {", ".join(POLARITY_LABELS)} +# 출력 형식: JSON {{ "label": "...", "polarity": "..." }} +# {target_word}: {text} +# """ + +# # 라벨링 +# resp = client.chat.completions.create( +# model="gpt-4.1-mini", +# messages=[{"role": "user", "content": prompt}], +# response_format={"type": "json_object"} +# ) +# parsed = json.loads(resp.choices[0].message.content) +# label = parsed["label"] +# polarity = parsed["polarity"] + +# # 임베딩 +# embedding = get_embedding(text) +# # embedding = embedding_model.encode([text])[0] + +# return label, polarity, embedding + + + + + + + + + +##### 코드 리팩토링 전 (완전 옛날)##### # 질문 라벨링 + 임베딩 -def embed_and_label_question(question: str): - prompt = f""" - 너는 리뷰 분석기야. 질문 문장을 보고 아래 후보 중 라벨과 polarity를 정해줘. - 라벨 후보: {", ".join(REVIEW_LABELS)} - 폴라리티 후보: {", ".join(POLARITY_LABELS)} - 출력 형식: JSON {{ "label": "...", "polarity": "..." }} - 질문: {question} - """ - resp = client.chat.completions.create( - model="gpt-41-mini", - messages=[{"role": "user", "content": prompt}], - response_format={"type": "json_object"} - ) - - # 최신 SDK에서는 .content 안에 JSON string 들어옴 - parsed = json.loads(resp.choices[0].message.content) - label = parsed["label"] - polarity = parsed["polarity"] - - # OpenAI Embedding API 호출 - embed_resp = client.embeddings.create( - model="text-embedding-3-small", - input=question - ) - embedding = embed_resp.data[0].embedding - - return label, polarity, embedding - - -# 리뷰 라벨링 + 임베딩 -def embed_and_label_review(text: str): - prompt = f""" - 너는 리뷰 분석기야. 리뷰 문장을 보고 아래 후보 중 라벨과 polarity를 정해줘. - 라벨 후보: {", ".join(REVIEW_LABELS)} - 폴라리티 후보: {", ".join(POLARITY_LABELS)} - 출력 형식: JSON {{ "label": "...", "polarity": "..." }} - 리뷰: {text} - """ - resp = client.chat.completions.create( - model="gpt-41-mini", - messages=[{"role": "user", "content": prompt}], - response_format={"type": "json_object"} - ) - - parsed = json.loads(resp.choices[0].message.content) - label = parsed["label"] - polarity = parsed["polarity"] - - embed_resp = client.embeddings.create( - model="text-embedding-3-small", - input=text - ) - embedding = embed_resp.data[0].embedding - - return label, polarity, embedding +# def embed_and_label_question(question: str): +# prompt = f""" +# 너는 리뷰 분석기야. 질문 문장을 보고 아래 후보 중 라벨과 polarity를 정해줘. +# 라벨 후보: {", ".join(REVIEW_LABELS)} +# 폴라리티 후보: {", ".join(POLARITY_LABELS)} +# 출력 형식: JSON {{ "label": "...", "polarity": "..." }} +# 질문: {question} +# """ +# resp = client.chat.completions.create( +# model="gpt-41-mini", +# messages=[{"role": "user", "content": prompt}], +# response_format={"type": "json_object"} +# ) + +# # 최신 SDK에서는 .content 안에 JSON string 들어옴 +# parsed = json.loads(resp.choices[0].message.content) +# label = parsed["label"] +# polarity = parsed["polarity"] + +# # 임베딩 추출 +# embedding = embedding_model.encode([question])[0] + +# return label, polarity, embedding + + +# # 리뷰 라벨링 + 임베딩 +# def embed_and_label_review(text: str): +# # 라벨링 +# prompt = f""" +# 너는 리뷰 분석기야. 리뷰 문장을 보고 아래 후보 중 라벨과 polarity를 정해줘. +# 라벨 후보: {", ".join(REVIEW_LABELS)} +# 폴라리티 후보: {", ".join(POLARITY_LABELS)} +# 출력 형식: JSON {{ "label": "...", "polarity": "..." }} +# 리뷰: {text} +# """ +# resp = client.chat.completions.create( +# model="gpt-41-mini", +# messages=[{"role": "user", "content": prompt}], +# response_format={"type": "json_object"} +# ) + +# parsed = json.loads(resp.choices[0].message.content) +# label = parsed["label"] +# polarity = parsed["polarity"] + +# # 임베딩 (로컬 모델) +# embedding = embedding_model.encode([text])[0] + +# return label, polarity, embedding # (옵션) 필요시: 임베딩 문서 업데이트 로직도 여기에 넣을 수 있음 diff --git a/msa-ai-service/app/services/rag_service.py b/msa-ai-service/app/services/rag_service.py index f0a26f5..bd866d4 100644 --- a/msa-ai-service/app/services/rag_service.py +++ b/msa-ai-service/app/services/rag_service.py @@ -1,155 +1,238 @@ -# 응답 생성 로직 (실제 운영 파이프라인, LLM/RAG 호출) -""" -질문 임베딩과 리뷰 임베딩 비교 -label + polarity 필터링 -Top-K 리뷰 선택 -답변 텍스트 생성 및 qa_answers 저장 -""" - -from datetime import datetime -from openai import OpenAI -import numpy as np from app.db.mongodb import get_collection -from app.core.config import OPENAI_API_KEY -from app.services.embedding_service import embed_and_label_question +from app.core.config import ENV +import numpy as np +import logging -client = OpenAI(api_key=OPENAI_API_KEY) +logger = logging.getLogger(__name__) -# Mongo 컬렉션 queries_embedding_col = get_collection("queries_embedding") reviews_embedding_col = get_collection("reviews_embedding") -qa_answers_col = get_collection("qa_answers") - -# 코사인 유사도 -def cosine_similarity(a, b): - a, b = np.array(a), np.array(b) - return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) - -# GPT 기반 답변 생성 -def generate_answer(store_name, menu_name, question, label, reviews): - review_texts = "\n".join([f"- {r['text']} ({r['polarity']})" for r in reviews]) - - prompt = f""" -당신은 음식점 리뷰 분석기입니다. - -질문: {question} -메뉴: {menu_name} @ {store_name} -리뷰 ({len(reviews)}건): -{review_texts} - -규칙: -1. 반드시 위 리뷰만 사실 근거로 삼아 답변하세요. -2. {label} 속성에 해당하는 리뷰들 중, 긍정/부정 리뷰 개수를 세어라. -3. 다음 형식으로 요약하라: - - {label} 관련 리뷰 {len(reviews)}건 중 X건은 긍정적이고, Y건은 부정적입니다." - - 마지막에 결론을 붙여라. (예: "대체로 짜다고 합니다", "의견이 갈립니다", "비율이 비슷합니다", "너무 짜다고 합니다") -4. 긍정/부정이라는 단어는 쓰지 말고, {label} 속성에 맞는 자연스러운 한국어 서술형으로 풀어라. - - 예: salty → "짜다" / "짜지 않다" - - 예: quantity → "양이 많다" / "양이 적다" - - 예: spicy → "맵다" / "안맵다" - - 표현은 리뷰 맥락에 맞게 자연스럽게 변형해도 된다. -""" - - resp = client.chat.completions.create( - model="gpt-41-mini", - messages=[{"role": "user", "content": prompt}] - ) - return resp.choices[0].message.content.strip() - -# Change Stream 자동 호출 -def process_query(store_doc, menu, query_emb): - """ - Change Stream에서 새로운 질문 들어왔을 때 실행되는 자동 응답 생성기 - """ - print("📌 process_query 진입:", store_doc["_id"], menu["menu_id"], query_emb["label"]) - - reviews_doc = reviews_embedding_col.find_one({"_id": store_doc["_id"]}) - if not reviews_doc: - return None - - target_menu = next((m for m in reviews_doc["menus"] if m["menu_id"] == menu["menu_id"]), None) - if not target_menu or "reviews_embedding" not in target_menu: - return None - - # 라벨 맞는 리뷰만 (긍/부정 포함) - candidate_reviews = [r for r in target_menu["reviews_embedding"] if r["label"] == query_emb["label"]] - print("후보 리뷰 개수:", len(candidate_reviews)) - if not candidate_reviews: - return None - - # 유사도 top-5 - scored = [(cosine_similarity(query_emb["embedding"], r["embedding"]), r) for r in candidate_reviews] - scored = sorted(scored, key=lambda x: x[0], reverse=True)[:5] - selected_reviews = [r for _, r in scored] - - # GPT 답변 생성 - answer_text = generate_answer( - store_doc["store_name"], - menu["menu_name"], - query_emb["question"], - query_emb["label"], - selected_reviews - ) - print("qa_answers 저장 시도:", query_emb["request_id"]) - # qa_answers 저장 - qa_answers_col.update_one( - {"_id": query_emb["request_id"]}, - {"$set": { - "store_id": store_doc["_id"], - "store_name": store_doc["store_name"], - "menu_id": menu["menu_id"], - "menu_name": menu["menu_name"], - "answer": answer_text, - "label": query_emb["label"], - "polarity": query_emb["polarity"], - "created_at": datetime.utcnow() - }}, - upsert=True - ) - - return answer_text - -# 수동 API 호출 -def generate_answer_from_reviews(store_id: str, menu_id: str, question: str): - reviews_doc = reviews_embedding_col.find_one({"_id": store_id}) - if not reviews_doc: - return {"error": "no reviews_embedding found"} - - target_menu = next((m for m in reviews_doc["menus"] if m["menu_id"] == menu_id), None) - if not target_menu or "reviews_embedding" not in target_menu: - return {"error": "no reviews for this menu"} - - # 질문 임베딩 + 라벨링 - label, polarity, embedding = embed_and_label_question(question) - - candidate_reviews = [r for r in target_menu["reviews_embedding"] if r["label"] == label] - if not candidate_reviews: - return {"error": "no matching reviews"} - - scored = [(cosine_similarity(embedding, r["embedding"]), r) for r in candidate_reviews] - scored = sorted(scored, key=lambda x: x[0], reverse=True)[:5] - selected_reviews = [r for _, r in scored] - - answer_text = generate_answer( - reviews_doc["store_name"], - target_menu["menu_name"], - question, - label, - selected_reviews - ) - - qa_answers_col.update_one( - {"_id": question}, - {"$set": { +answers_col = get_collection("answers") + +class RagService: + def run_rag(self, query_id, store_id, menu_id): + query_doc = queries_embedding_col.find_one({"_id": query_id}) + if not query_doc: + return "임베딩 생성 실패" + + query_vec = np.array(query_doc["embedding"]) + + if ENV == "prod": + # --------------------------- + # Atlas Vector Search 방식 + # --------------------------- + pipeline = [ + { + "$vectorSearch": { + "queryVector": query_vec.tolist(), + "path": "embedding", + "numCandidates": 50, + "limit": 5, + "index": "reviews_embedding_index" # Atlas에 생성한 인덱스 이름 + } + }, + {"$match": {"menu_id": menu_id}}, + {"$limit": 1} + ] + results = list(reviews_embedding_col.aggregate(pipeline)) + if not results: + return "관련 리뷰가 없습니다." + top_review = results[0] + + else: + # --------------------------- + # Dev: 로컬 numpy 코사인 유사도 + # --------------------------- + candidates = reviews_embedding_col.find( + {"menu_id": menu_id}, + {"embedding": 1, "review": 1, "label": 1} + ) + + scored = [] + for doc in candidates: + review_vec = np.array(doc["embedding"]) + sim = np.dot(query_vec, review_vec) / ( + np.linalg.norm(query_vec) * np.linalg.norm(review_vec) + ) + scored.append((sim, doc)) + + if not scored: + return "관련 리뷰가 없습니다." + + scored.sort(key=lambda x: x[0], reverse=True) + top_review = scored[0][1] + + # --------------------------- + # Answer 저장 + # --------------------------- + answer_text = f"{top_review['review']} (라벨: {top_review['label']})" + + answers_col.insert_one({ + "_id": query_id, "store_id": store_id, - "store_name": reviews_doc["store_name"], "menu_id": menu_id, - "menu_name": target_menu["menu_name"], "answer": answer_text, - "label": label, - "created_at": datetime.utcnow() - }}, - upsert=True - ) - - return {"answer": answer_text, "reviews_used": [r["text"] for r in selected_reviews]} + "label": query_doc.get("label"), + }) + + logger.info(f"✅ RAG completed: query={query_id}, answer={answer_text}") + return answer_text + + +# # 응답 생성 로직 (실제 운영 파이프라인, LLM/RAG 호출) +# """ +# 질문 임베딩과 리뷰 임베딩 비교 +# label + polarity 필터링 +# Top-K 리뷰 선택 +# 답변 텍스트 생성 및 answers 저장 +# """ + +# from datetime import datetime +# from openai import OpenAI +# import numpy as np +# from app.db.mongodb import get_collection +# from app.core.config import OPENAI_API_KEY +# from app.services.embedding_service import embed_and_label_question + +# client = OpenAI(api_key=OPENAI_API_KEY) + +# # Mongo 컬렉션 +# queries_embedding_col = get_collection("queries_embedding") +# reviews_embedding_col = get_collection("reviews_embedding") +# answers_col = get_collection("answers") + +# # 코사인 유사도 +# def cosine_similarity(a, b): +# a, b = np.array(a), np.array(b) +# return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) + +# # GPT 기반 답변 생성 +# def generate_answer(store_name, menu_name, question, label, reviews): +# review_texts = "\n".join([f"- {r['text']} ({r['polarity']})" for r in reviews]) + +# prompt = f""" +# 당신은 음식점 리뷰 분석기입니다. + +# 질문: {question} +# 메뉴: {menu_name} @ {store_name} +# 리뷰 ({len(reviews)}건): +# {review_texts} + +# 규칙: +# 1. 반드시 위 리뷰만 사실 근거로 삼아 답변하세요. +# 2. {label} 속성에 해당하는 리뷰들 중, 긍정/부정 리뷰 개수를 세어라. +# 3. 다음 형식으로 요약하라: +# - {label} 관련 리뷰 {len(reviews)}건 중 X건은 긍정적이고, Y건은 부정적입니다." +# - 마지막에 결론을 붙여라. (예: "대체로 짜다고 합니다", "의견이 갈립니다", "비율이 비슷합니다", "너무 짜다고 합니다") +# 4. 긍정/부정이라는 단어는 쓰지 말고, {label} 속성에 맞는 자연스러운 한국어 서술형으로 풀어라. +# - 예: salty → "짜다" / "짜지 않다" +# - 예: quantity → "양이 많다" / "양이 적다" +# - 예: spicy → "맵다" / "안맵다" +# - 표현은 리뷰 맥락에 맞게 자연스럽게 변형해도 된다. +# """ + +# resp = client.chat.completions.create( +# model="gpt-41-mini", +# messages=[{"role": "user", "content": prompt}] +# ) +# return resp.choices[0].message.content.strip() + + + +# # Change Stream 자동 호출 +# def process_query(store_doc, menu, query_emb): +# """ +# Change Stream에서 새로운 질문 들어왔을 때 실행되는 자동 응답 생성기 +# """ +# print("📌 process_query 진입:", store_doc["_id"], menu["menu_id"], query_emb["label"]) + +# reviews_doc = reviews_embedding_col.find_one({"_id": store_doc["_id"]}) +# if not reviews_doc: +# return None + +# target_menu = next((m for m in reviews_doc["menus"] if m["menu_id"] == menu["menu_id"]), None) +# if not target_menu or "reviews_embedding" not in target_menu: +# return None + +# # 라벨 맞는 리뷰만 (긍/부정 포함) +# candidate_reviews = [r for r in target_menu["reviews_embedding"] if r["label"] == query_emb["label"]] +# print("후보 리뷰 개수:", len(candidate_reviews)) +# if not candidate_reviews: +# return None + +# # 유사도 top-5 +# scored = [(cosine_similarity(query_emb["embedding"], r["embedding"]), r) for r in candidate_reviews] +# scored = sorted(scored, key=lambda x: x[0], reverse=True)[:5] +# selected_reviews = [r for _, r in scored] + +# # GPT 답변 생성 +# answer_text = generate_answer( +# store_doc["store_name"], +# menu["menu_name"], +# query_emb["question"], +# query_emb["label"], +# selected_reviews +# ) +# print("answers 저장 시도:", query_emb["request_id"]) +# # answers 저장 +# answers_col.update_one( +# {"_id": query_emb["request_id"]}, +# {"$set": { +# "store_id": store_doc["_id"], +# "store_name": store_doc["store_name"], +# "menu_id": menu["menu_id"], +# "menu_name": menu["menu_name"], +# "answer": answer_text, +# "label": query_emb["label"], +# "polarity": query_emb["polarity"], +# "created_at": datetime.utcnow() +# }}, +# upsert=True +# ) + +# return answer_text + +# # 수동 API 호출 +# def generate_answer_from_reviews(store_id: str, menu_id: str, question: str): +# reviews_doc = reviews_embedding_col.find_one({"_id": store_id}) +# if not reviews_doc: +# return {"error": "no reviews_embedding found"} + +# target_menu = next((m for m in reviews_doc["menus"] if m["menu_id"] == menu_id), None) +# if not target_menu or "reviews_embedding" not in target_menu: +# return {"error": "no reviews for this menu"} + +# # 질문 임베딩 + 라벨링 +# label, polarity, embedding = embed_and_label_question(question) + +# candidate_reviews = [r for r in target_menu["reviews_embedding"] if r["label"] == label] +# if not candidate_reviews: +# return {"error": "no matching reviews"} + +# scored = [(cosine_similarity(embedding, r["embedding"]), r) for r in candidate_reviews] +# scored = sorted(scored, key=lambda x: x[0], reverse=True)[:5] +# selected_reviews = [r for _, r in scored] + +# answer_text = generate_answer( +# reviews_doc["store_name"], +# target_menu["menu_name"], +# question, +# label, +# selected_reviews +# ) + +# answers_col.update_one( +# {"_id": question}, +# {"$set": { +# "store_id": store_id, +# "store_name": reviews_doc["store_name"], +# "menu_id": menu_id, +# "menu_name": target_menu["menu_name"], +# "answer": answer_text, +# "label": label, +# "created_at": datetime.utcnow() +# }}, +# upsert=True +# ) + +# return {"answer": answer_text, "reviews_used": [r["text"] for r in selected_reviews]} diff --git a/msa-ai-service/app/services/redis_service.py b/msa-ai-service/app/services/redis_service.py new file mode 100644 index 0000000..9b3cd08 --- /dev/null +++ b/msa-ai-service/app/services/redis_service.py @@ -0,0 +1,430 @@ +import asyncio +import logging +import redis +import socket +from datetime import datetime +from app.db.mongodb import get_collection +from app.core.config import ( + REDIS_HOST, REDIS_PORT, + REDIS_REQUEST_STREAM, REDIS_RESPONSE_STREAM, MONGODB_NAME +) +from app.services.embedding_service import EmbeddingService +from app.services.rag_service import RagService + +logger = logging.getLogger(__name__) + +r = redis.Redis( + host=REDIS_HOST, + port=REDIS_PORT, + decode_responses=True, + encoding='utf-8' +) + +GROUP_NAME = "ai-service" +CONSUMER_NAME = "fastapi-worker" +# CONSUMER_NAME = f"fastapi-{socket.gethostname()}" + +queries_col = get_collection("queries") + +embedding_svc = EmbeddingService() +rag_svc = RagService() + + +# --------------------------- +# Consumer 역할 +# --------------------------- +def init_consumer_group(): + """Redis Consumer Group 초기화""" + try: + r.xgroup_create( + REDIS_REQUEST_STREAM, + GROUP_NAME, + id="$", + mkstream=True + ) + logger.info("✅ Redis consumer group created") + + except redis.exceptions.ResponseError as e: + if "BUSYGROUP" in str(e): + logger.info("ℹ️ Consumer group already exists") + else: + raise + + +def read_request(): + """Redis Stream에서 새 요청 읽기""" + msgs = r.xreadgroup( + GROUP_NAME, + CONSUMER_NAME, + {REDIS_REQUEST_STREAM: ">"}, + count=1, + block=5000 + ) + + logger.info(f"📦 Raw msgs: {msgs}") + + if not msgs: + return None, None + + stream, elements = msgs[0] + for msg_id, fields in elements: + logger.info(f"📥 Received from {stream}: id={msg_id}, fields={fields}") + return msg_id, fields + + return None, None + + +# --------------------------- +# Producer 역할 +# --------------------------- +def add_request(request_id: str, store_id: str, menu_id: str, query: str): + """FastAPI에서 직접 Redis 요청 스트림에 메시지 추가""" + return r.xadd(REDIS_REQUEST_STREAM, { + "request_id": request_id, + "store_id": store_id, + "menu_id": menu_id, + "query": query + }) + + +def add_response(request_id: str, answer: str, source: str = "generated"): + """FastAPI가 생성한 응답을 Redis 응답 스트림에 push""" + r.xadd(REDIS_RESPONSE_STREAM, { + "request_id": request_id, + "answer": answer, + "source": source + }) + logger.info(f"✅ Response pushed for {request_id}") + + +# --------------------------- +# Worker Loop +# --------------------------- +async def worker_loop(): + """Redis Worker Loop""" + while True: + msg_id, msg = read_request() + if msg: + try: + logger.info(f"📥 Processing message: {msg}") + + request_id = msg["request_id"] + store_id = msg["store_id"] + menu_id = msg["menu_id"] + query = msg.get("query") + + # 1. MongoDB 저장 + result = queries_col.update_one( + {"_id": store_id, "menus.menu_id": menu_id}, + { + "$push": {"menus.$.queries": { + "query_id": request_id, + "query": query + }}, + "$set": {"updated_at": datetime.utcnow()} + } + ) + + if result.matched_count == 0: + queries_col.update_one( + {"_id": store_id}, + { + "$push": { + "menus": { + "menu_id": menu_id, + "queries": [{ + "query_id": request_id, + "query": query + }] + } + }, + "$set": {"updated_at": datetime.utcnow()} + }, + upsert=True + ) + logger.info(f"🆕 New menu created: store={store_id}, menu={menu_id}") + else: + logger.info(f"➕ Query pushed: store={store_id}, menu={menu_id}") + + logger.info( + f"✅ Saved to MongoDB: store={store_id}, menu={menu_id}, request={request_id}, db={MONGODB_NAME}" + ) + + # 2. gRPC로 model-service 호출 → 임베딩/라벨링 + meta = {"type": "query", "query_id": request_id, "store_id": store_id, "menu_id": menu_id} + embedding_svc.embed_and_label(query, meta) + + # 3. RAG 실행 → answers 저장 + answer = rag_svc.run_rag(request_id, store_id, menu_id) + + # 4. Redis 응답 스트림 push + add_response(request_id, answer) + + # ✅ 모든 작업 성공 후 ack + r.xack(REDIS_REQUEST_STREAM, GROUP_NAME, msg_id) + logger.info(f"👍 Acked message {msg_id}") + + except Exception as e: + logger.error(f"❌ Worker 처리 실패: {e}", exc_info=True) + + await asyncio.sleep(0.1) + + +def start_redis_consumer(): + """FastAPI lifespan에서 호출""" + init_consumer_group() + asyncio.create_task(worker_loop()) + logger.info("🚀 Redis consumer started") + + +# # 레디스 컨슈머 + 프로듀서 +# # 임시테스트 완료 + +# import asyncio +# import logging +# import redis +# import socket +# from datetime import datetime +# from app.db.mongodb import get_collection +# from app.core.config import ( +# REDIS_HOST, REDIS_PORT, +# REDIS_REQUEST_STREAM, REDIS_RESPONSE_STREAM, MONGODB_NAME +# ) +# from app.services.embedding_service import EmbeddingService +# from app.services.rag_service import RagService + +# logger = logging.getLogger(__name__) + +# r = redis.Redis( +# host=REDIS_HOST, +# port=REDIS_PORT, +# decode_responses=True, +# encoding='utf-8') + +# GROUP_NAME = "ai-service" +# CONSUMER_NAME = "fastapi-worker" +# # CONSUMER_NAME = f"fastapi-{socket.gethostname()}" + +# queries_col = get_collection("queries") + +# embedding_svc = EmbeddingService() +# rag_svc = RagService() + + +# # --------------------------- +# # Consumer 역할 +# # --------------------------- +# last_id = "0" +# def init_consumer_group(): +# """Redis Consumer Group 초기화""" +# try: +# r.xgroup_create( +# REDIS_REQUEST_STREAM, +# GROUP_NAME, +# id="$", +# mkstream=True +# ) +# logger.info("✅ Redis consumer group created") + +# except redis.exceptions.ResponseError as e: +# if "BUSYGROUP" in str(e): +# logger.info("ℹ️ Consumer group already exists") +# else: +# raise +# def read_request(): +# msgs = r.xreadgroup( +# GROUP_NAME, +# CONSUMER_NAME, +# {REDIS_REQUEST_STREAM: ">"}, # backlog 포함 전부 읽기 +# count=1, +# block=5000 +# ) + +# logger.info(f"📦 Raw msgs: {msgs}") + +# if not msgs: +# return None + +# stream, elements = msgs[0] +# for msg_id, fields in elements: +# logger.info(f"📥 Received from {stream}: id={msg_id}, fields={fields}") +# r.xack(REDIS_REQUEST_STREAM, GROUP_NAME, msg_id) +# return fields + +# """ +# def read_request(): +# global last_id + +# msgs = r.xreadgroup( +# GROUP_NAME, +# CONSUMER_NAME, +# {REDIS_REQUEST_STREAM: last_id}, +# count=1, +# block=5000 +# ) + +# logger.info(f"📦 Raw msgs: {msgs}") # 무조건 찍기 + +# if not msgs: +# return None + +# stream, elements = msgs[0] +# if not elements: +# return None + +# msg_id, fields = elements[0] +# logger.info(f"📦 Read from stream: id={msg_id}, fields={fields}") +# r.xack(REDIS_REQUEST_STREAM, GROUP_NAME, msg_id) + +# if last_id == "0": +# last_id = ">" + +# return fields + +# """ + +# # def read_request(): +# # msgs = r.xreadgroup( +# # GROUP_NAME, +# # CONSUMER_NAME, +# # {REDIS_REQUEST_STREAM: ">"}, +# # count=1, +# # block=5000 +# # ) +# # logger.info(f"📦 Raw msgs: {msgs}") + +# # if not msgs: +# # return None +# # msg_id, fields = msgs[0][1][0] + +# # # _, elements = msgs[0] +# # # msg_id, fields = elements[0] +# # logger.info(f"📦 Read from stream: id={msg_id}, fields={fields}") +# # r.xack(REDIS_REQUEST_STREAM, GROUP_NAME, msg_id) +# # return fields + +# # def read_request(process_old = False): +# # """Redis Stream에서 요청 읽기""" +# # last_id = "0" if process_old else ">" +# # msgs = r.xreadgroup( +# # GROUP_NAME, +# # CONSUMER_NAME, +# # {REDIS_REQUEST_STREAM: "last_id"}, # >로 해보기 +# # count=1, +# # block=5000) + +# # if not msgs: +# # logger.debug("⏳ No new messages") +# # return None + +# # _, elements = msgs[0] +# # for msg_id, fields in elements: +# # logger.info(f"📦 Read from stream: id={msg_id}, fields={fields}") +# # r.xack(REDIS_REQUEST_STREAM, GROUP_NAME, msg_id) +# # yield fields + + +# # --------------------------- +# # Producer 역할 +# # --------------------------- +# def add_request(request_id: str, store_id: str, menu_id: str, query: str): +# """ +# FastAPI에서 직접 Redis 요청 스트림에 메시지 추가 (테스트/내부용) +# """ +# return r.xadd(REDIS_REQUEST_STREAM, { +# "request_id": request_id, +# "store_id": store_id, +# "menu_id": menu_id, +# "query": query +# }) + + +# def add_response(request_id: str, answer: str, source: str = "generated"): +# """ +# FastAPI가 생성한 응답을 Redis 응답 스트림에 push +# """ +# return r.xadd(REDIS_RESPONSE_STREAM, { +# "request_id": request_id, +# "answer": answer +# }) +# logger.info(f"✅ Response pushed for {request_id}") + + + +# # --------------------------- +# # Worker Loop +# # --------------------------- +# async def worker_loop(): +# """Redis Worker Loop""" +# while True: +# msg = read_request() +# if msg: +# logger.info(f"📥 Received: {msg}") + +# request_id = msg["request_id"] +# store_id = msg["store_id"] +# menu_id = msg["menu_id"] +# query = msg.get("query") + + +# # 1. 기존 store + menu 찾고 질문 push +# result = queries_col.update_one( +# {"_id": store_id, "menus.menu_id": menu_id}, +# { +# "$push": { +# "menus.$.queries": { +# "query_id": request_id, +# "query": query +# } +# }, +# "$set": {"updated_at": datetime.utcnow()} +# } +# ) + +# # 1-2. store_id 문서가 없거나 menu_id가 없을 때 → 새로 생성 +# if result.matched_count == 0: +# queries_col.update_one( +# {"_id": store_id}, +# { +# "$push": { +# "menus": { +# "menu_id": menu_id, +# "queries": [ +# { +# "query_id": request_id, +# "query": query +# } +# ] +# } +# }, +# "$set": {"updated_at": datetime.utcnow()} +# }, +# upsert=True +# ) + +# logger.info(f"✅ Saved to MongoDB: store={store_id}, menu={menu_id}, request={request_id}, db={MONGODB_NAME}") + + +# # 2. gRPC로 model-service 호출 → 임베딩/라벨링 + queries_embedding 저장 +# meta = {"type": "query", "query_id": request_id, "store_id": store_id, "menu_id": menu_id} +# embedding_svc.embed_and_label(query, meta) + +# # 3. RAG 실행 → answers 저장 +# answer = rag_svc.run_rag(request_id, store_id, menu_id) + +# # 4. Redis 응답 스트림 push (Spring이 사용자한테 전달) +# add_response(request_id, answer) + + +# # # 👉 간단히 확인만: 받은 질문 그대로 응답 +# # answer = f"'{query}'에 대한 임시 응답입니다." +# # add_response(request_id, answer) + +# await asyncio.sleep(0.1) + + +# def start_redis_consumer(): +# """FastAPI lifespan에서 호출""" +# init_consumer_group() +# asyncio.create_task(worker_loop()) +# logger.info("🚀 Redis consumer started") \ No newline at end of file diff --git a/msa-ai-service/app/services/review_watcher.py b/msa-ai-service/app/services/review_watcher.py new file mode 100644 index 0000000..e9220b0 --- /dev/null +++ b/msa-ai-service/app/services/review_watcher.py @@ -0,0 +1,53 @@ +import asyncio +import logging +from bson import ObjectId +from datetime import datetime +from app.db.mongodb import get_collection +from app.services.embedding_service import EmbeddingService + +logger = logging.getLogger(__name__) + +reviews_col = get_collection("reviews") +reviews_embedding_col = get_collection("reviews_embedding") + +embedding_svc = EmbeddingService() + +async def watch_reviews(): + """ + MongoDB Change Stream으로 reviews 컬렉션 감시 + """ + pipeline = [{"$match": {"operationType": "insert"}}] + + try: + with reviews_col.watch(pipeline) as stream: + for change in stream: + full_doc = change["fullDocument"] + + store_id = str(full_doc["_id"]) + for menu in full_doc.get("menus", []): + menu_id = menu["menu_id"] + + for review in menu.get("reviews", []): + review_id = review["review_id"] + text = review["text"] + + # 중복 체크 + exists = reviews_embedding_col.find_one({"_id": review_id}) + if exists: + logger.info(f"⚠️ Review {review_id} already embedded, skipping") + continue + + # gRPC 호출 (model-service) + meta = { + "type": "review", + "review_id": review_id, + "store_id": store_id, + "menu_id": menu_id, + } + status = embedding_svc.embed_and_label(text, meta) + logger.info(f"✅ Embedded review {review_id}, status={status}") + + except Exception as e: + logger.error(f"❌ Change Stream stopped: {e}") + await asyncio.sleep(5) + await watch_reviews() # 재시작 diff --git a/msa-ai-service/log_config.yaml b/msa-ai-service/log_config.yaml new file mode 100644 index 0000000..b331b51 --- /dev/null +++ b/msa-ai-service/log_config.yaml @@ -0,0 +1,34 @@ +version: 1 +disable_existing_loggers: false +formatters: + default: + (): "uvicorn.logging.DefaultFormatter" + fmt: "%(levelprefix)s %(message)s" + use_colors: null + access: + (): "uvicorn.logging.AccessFormatter" + # 아래 fmt와 datefmt를 수정하여 접속 로그 형식을 바꿀 수 있습니다. + fmt: '%(asctime)s %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s' + datefmt: "%Y-%m-%d %H:%M:%S" # 원하는 날짜/시간 포맷 +handlers: + default: + formatter: default + class: logging.StreamHandler + stream: ext://sys.stderr + access: + formatter: access + class: logging.StreamHandler + stream: ext://sys.stdout +loggers: + uvicorn: + handlers: + - default + level: INFO + propagate: false + uvicorn.error: + level: INFO + uvicorn.access: + handlers: + - access + level: INFO + propagate: false \ No newline at end of file diff --git a/msa-ai-service/requirements.txt b/msa-ai-service/requirements.txt index 944f04c..bac2c0f 100644 --- a/msa-ai-service/requirements.txt +++ b/msa-ai-service/requirements.txt @@ -9,4 +9,7 @@ certifi dnspython numpy sentence-transformers -torch \ No newline at end of file +torch +redis +grpcio +grpcio-tools \ No newline at end of file diff --git a/msa-ai-service/task-definition.json b/msa-ai-service/task-definition.json index 82de17a..eb2cd1a 100644 --- a/msa-ai-service/task-definition.json +++ b/msa-ai-service/task-definition.json @@ -17,10 +17,10 @@ ], "essential": true, "environment": [ - { - "name": "SPRING_PROFILES_ACTIVE", - "value": "dev" - } + { "name": "ENV", "value": "prod" }, + { "name": "MONGODB_URI", "value": "mongodb+srv://ksm3255:!4786buch@team1mongodb.oh0o0np.mongodb.net/?retryWrites=true&w=majority&appName=Team1Mongodb" }, + { "name": "MONGODB_NAME", "value": "ai_service_db" }, + { "name": "OPENAI_API_KEY", "value": "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" } ], "mountPoints": [], "volumesFrom": [], diff --git a/proto/model.proto b/proto/model.proto new file mode 100644 index 0000000..0551cef --- /dev/null +++ b/proto/model.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package model; + +service ModelService { + rpc GetEmbedding (EmbeddingRequest) returns (EmbeddingResponse); +} + +message EmbeddingRequest { + string text = 1; + map meta = 2; +} + +message EmbeddingResponse { + string status = 1; +}