diff --git a/backend/score.py b/backend/score.py index bd419f793..0fa83d4d4 100644 --- a/backend/score.py +++ b/backend/score.py @@ -891,17 +891,14 @@ async def retry_processing(uri=Form(None), userName=Form(None), password=Form(No try: start = time.time() graph = create_graph_database_connection(uri, userName, password, database) - chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name}) + # chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name}) end = time.time() elapsed_time = end - start json_obj = {'api_name':'retry_processing', 'db_url':uri, 'userName':userName, 'database':database, 'file_name':file_name,'retry_condition':retry_condition, 'logging_time': formatted_time(datetime.now(timezone.utc)), 'elapsed_api_time':f'{elapsed_time:.2f}','email':email} logger.log_struct(json_obj, "INFO") - if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks : - return create_api_response('Success',message=f"Chunks are not created for the file{file_name}. Please upload again the file to re-process.",data=chunks) - else: - await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition) - return create_api_response('Success',message=f"Status set to Ready to Reprocess for filename : {file_name}") + await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition) + return create_api_response('Success',message=f"Status set to Ready to Reprocess for filename : {file_name}") except Exception as e: job_status = "Failed" message="Unable to set status to Retry" diff --git a/backend/src/main.py b/backend/src/main.py index 4bdb6ba51..4ee3e8a06 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -230,7 +230,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type async def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions): logging.info(f'Process file name :{fileName}') - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: gcs_file_cache = os.environ.get('GCS_FILE_CACHE') if gcs_file_cache == 'True': folder_name = create_gcs_bucket_folder_name_hashed(uri, fileName) @@ -244,7 +244,7 @@ async def extract_graph_from_file_local_file(uri, userName, password, database, return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, True, merged_file_path, retry_condition, additional_instructions=additional_instructions) async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions): - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: if(aws_access_key_id==None or aws_secret_access_key==None): raise LLMGraphBuilderException('Please provide AWS access and secret keys') else: @@ -258,7 +258,7 @@ async def extract_graph_from_file_s3(uri, userName, password, database, model, s return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions) async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions): - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: pages = get_documents_from_web_page(source_url) if pages==None or len(pages)==0: raise LLMGraphBuilderException(f'Content is not available for given URL : {file_name}') @@ -267,7 +267,7 @@ async def extract_graph_from_web_page(uri, userName, password, database, model, return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions) async def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions): - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: file_name, pages = get_documents_from_youtube(source_url) if pages==None or len(pages)==0: @@ -277,7 +277,7 @@ async def extract_graph_from_file_youtube(uri, userName, password, database, mod return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions) async def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions): - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: file_name, pages = get_documents_from_Wikipedia(wiki_query, language) if pages==None or len(pages)==0: raise LLMGraphBuilderException(f'Wikipedia page is not available for file : {file_name}') @@ -286,7 +286,7 @@ async def extract_graph_from_file_Wikipedia(uri, userName, password, database, m return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions) async def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions): - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token) if pages==None or len(pages)==0: raise LLMGraphBuilderException(f'File content is not available for file : {file_name}') @@ -431,7 +431,7 @@ async def processing_source(uri, userName, password, database, model, file_name, # merged_file_path have value only when file uploaded from local - if is_uploaded_from_local: + if is_uploaded_from_local and bool(is_cancelled_status) == False: gcs_file_cache = os.environ.get('GCS_FILE_CACHE') if gcs_file_cache == 'True': folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name) @@ -511,7 +511,7 @@ async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password, return node_count,rel_count,latency_processing_chunk def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_overlap, retry_condition): - if not retry_condition: + if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]: logging.info("Break down file into chunks") bad_chars = ['"', "\n", "'"] for i in range(0,len(pages)): @@ -532,7 +532,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS, params={"filename":file_name}) if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks : - raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.") + raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file or reprocess the file with option Start From Beginning.") else: for chunk in chunks: chunk_doc = Document(page_content=chunk['text'], metadata={'id':chunk['id'], 'position':chunk['position']}) @@ -714,15 +714,9 @@ def manually_cancelled_job(graph, filenames, source_types, merged_dir, uri): obj_source_node.updated_at = datetime.now() graphDb_data_Access = graphDBdataAccess(graph) graphDb_data_Access.update_source_node(obj_source_node) - count_response = graphDb_data_Access.update_node_relationship_count(file_name) + #Update the nodeCount and relCount properties in Document node + graphDb_data_Access.update_node_relationship_count(file_name) obj_source_node = None - merged_file_path = os.path.join(merged_dir, file_name) - if source_type == 'local file' and gcs_file_cache == 'True': - folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name) - delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name) - else: - logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}') - delete_uploaded_local_file(merged_file_path,file_name) return "Cancelled the processing job successfully" def populate_graph_schema_from_text(text, model, is_schema_description_checked, is_local_storage): @@ -749,10 +743,19 @@ def set_status_retry(graph, file_name, retry_condition): obj_source_node.is_cancelled = False if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING or retry_condition == START_FROM_BEGINNING: obj_source_node.processed_chunk=0 - if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING: - execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name}) obj_source_node.node_count=0 obj_source_node.relationship_count=0 + obj_source_node.chunkNodeCount=0 + obj_source_node.chunkRelCount=0 + obj_source_node.communityNodeCount=0 + obj_source_node.communityRelCount=0 + obj_source_node.entityEntityRelCount=0 + obj_source_node.entityNodeCount=0 + obj_source_node.processingTime=0 + obj_source_node.total_chunks=0 + if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING: + execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name}) + logging.info(obj_source_node) graphDb_data_Access.update_source_node(obj_source_node) diff --git a/frontend/src/components/FileTable.tsx b/frontend/src/components/FileTable.tsx index b4ee14ff3..a8587be9d 100644 --- a/frontend/src/components/FileTable.tsx +++ b/frontend/src/components/FileTable.tsx @@ -79,6 +79,7 @@ const FileTable: ForwardRefRenderFunction = (props, re const columnHelper = createColumnHelper(); const [columnFilters, setColumnFilters] = useState([]); const [isLoading, setIsLoading] = useState(false); + const [isCancellingQueue, setIsCancellingQueue] = useState(false); const [statusFilter, setStatusFilter] = useState(''); const [filetypeFilter, setFiletypeFilter] = useState(''); const [fileSourceFilter, setFileSourceFilter] = useState(''); @@ -833,6 +834,73 @@ const FileTable: ForwardRefRenderFunction = (props, re } }, [connectionStatus, filesData.length, isReadOnlyUser]); + const refreshFileData = async () => { + try { + const res = await getSourceNodes(); + if (res.data && res.data.status !== 'Failed' && res.data.data.length) { + const updatedFiles = res.data.data + .map((item: SourceNode) => { + const existingFile = filesData.find((f) => f.name === item.fileName); + if (existingFile) { + // Check if file is in queue + const isInQueue = queue.items.some((f) => f.name === item.fileName); + return { + ...existingFile, + status: isInQueue ? 'Waiting' : getFileSourceStatus(item), + nodesCount: item?.nodeCount ?? existingFile.nodesCount, + relationshipsCount: item?.relationshipCount ?? existingFile.relationshipsCount, + processingTotalTime: item?.processingTime ?? existingFile.processingTotalTime, + }; + } + return existingFile; + }) + .filter(Boolean); + + setFilesData(updatedFiles as CustomFile[]); + setRowSelection((prev) => { + const updated = { ...prev }; + updatedFiles.forEach((file) => { + if (file?.status === 'Cancelled' && updated[file.id]) { + delete updated[file.id]; + } + }); + return updated; + }); + } + } catch (error) { + console.error('Refresh failed:', error); + } + }; + + const cancelQueue = async () => { + if (queue.isEmpty()) { + showNormalToast('No files in queue to cancel'); + return; + } + + setIsCancellingQueue(true); + try { + const queuedFileNames = queue.items.map((f) => f.name as string).filter(Boolean); + const queuedFileSources = queue.items.map((f) => f.fileSource as string).filter(Boolean); + const res = await cancelAPI(queuedFileNames, queuedFileSources); + + if (res.data.status === 'Success') { + queue.clear(); + await refreshFileData(); + + showNormalToast(`Successfully cancelled ${queuedFileNames.length} waiting file(s)`); + } else { + throw new Error(res.data.error || 'Failed to cancel queue'); + } + } catch (err) { + if (err instanceof Error) { + showErrorToast(`Failed to cancel queue: ${err.message}`); + } + } finally { + setIsCancellingQueue(false); + } + }; + const cancelHandler = async (fileName: string, id: string, fileSource: string) => { setFilesData((prevfiles) => prevfiles.map((curfile) => { @@ -860,6 +928,11 @@ const FileTable: ForwardRefRenderFunction = (props, re return curfile; }) ); + setRowSelection((prev) => { + const updated = { ...prev }; + delete updated[id]; + return updated; + }); setProcessedCount((prev) => { if (prev == batchSize) { return batchSize - 1; @@ -1036,14 +1109,44 @@ const FileTable: ForwardRefRenderFunction = (props, re ); } else if (connectionStatus) { + const queueSize = queue.size(); return ( - - - - - {`Large files may be partially processed up to 10K characters due to resource limit.`} - + + + + + + {`Large files may be partially processed up to 10K characters due to resource limit.`} + + {queueSize > 0 && ( + + + + {isCancellingQueue + ? 'Cancelling files in waiting queue...' + : `${queueSize} file${queueSize !== 1 ? 's' : ''} waiting in queue`} + + {!isReadOnlyUser && ( + + + + )} + + )} ); diff --git a/frontend/src/components/Graph/GraphViewModal.tsx b/frontend/src/components/Graph/GraphViewModal.tsx index 0df748050..b224a0275 100644 --- a/frontend/src/components/Graph/GraphViewModal.tsx +++ b/frontend/src/components/Graph/GraphViewModal.tsx @@ -233,9 +233,37 @@ const GraphViewModal: React.FunctionComponent = ({ }, [open]); useEffect(() => { - if (debouncedQuery) { - handleSearch(debouncedQuery); - } + const query = debouncedQuery.toLowerCase(); + const updatedNodes = node.map((nodeVal) => { + if (query === '') { + return { + ...nodeVal, + selected: false, + size: graphLabels.nodeSize, + }; + } + const { id, properties, caption } = nodeVal; + const propertiesMatch = properties?.id?.toLowerCase().includes(query); + const match = id.toLowerCase().includes(query) || propertiesMatch || caption?.toLowerCase().includes(query); + + if (match) { + console.log({ id, caption }); + } + return { + ...nodeVal, + selected: match, + }; + }); + const matchedNodes = updatedNodes.filter((n) => n.selected); + console.log(`Total matches: ${matchedNodes.length} out of ${node.length} nodes`); + const updatedRelationships = relationship.map((rel) => { + return { + ...rel, + selected: false, + }; + }); + setNode(updatedNodes); + setRelationship(updatedRelationships); }, [debouncedQuery]); const mouseEventCallbacks = useMemo( @@ -291,37 +319,6 @@ const GraphViewModal: React.FunctionComponent = ({ return relationship.find((relationshipVal) => relationshipVal.id === selected.id); }, [selected, relationship, node]); - const handleSearch = useCallback( - (value: string) => { - const query = value.toLowerCase(); - const updatedNodes = node.map((nodeVal) => { - if (query === '') { - return { - ...nodeVal, - selected: false, - size: graphLabels.nodeSize, - }; - } - const { id, properties, caption } = nodeVal; - const propertiesMatch = properties?.id?.toLowerCase().includes(query); - const match = id.toLowerCase().includes(query) || propertiesMatch || caption?.toLowerCase().includes(query); - return { - ...nodeVal, - selected: match, - }; - }); - const updatedRelationships = relationship.map((rel) => { - return { - ...rel, - selected: false, - }; - }); - setNode(updatedNodes); - setRelationship(updatedRelationships); - }, - [node, relationship] - ); - if (!open) { return <>; } diff --git a/frontend/src/components/Graph/SchemaViz.tsx b/frontend/src/components/Graph/SchemaViz.tsx index 0bb7d9762..b8b8c0dfb 100644 --- a/frontend/src/components/Graph/SchemaViz.tsx +++ b/frontend/src/components/Graph/SchemaViz.tsx @@ -95,9 +95,32 @@ const SchemaViz: React.FunctionComponent = ({ }, [open]); useEffect(() => { - if (debouncedQuery) { - handleSearch(debouncedQuery); - } + const query = debouncedQuery.toLowerCase(); + const updatedNodes = nodes.map((node) => { + if (query === '') { + return { + ...node, + selected: false, + size: graphLabels.nodeSize, + }; + } + const { id, properties, caption } = node; + const propertiesMatch = properties?.id?.toLowerCase().includes(query); + const match = id.toLowerCase().includes(query) || propertiesMatch || caption?.toLowerCase().includes(query); + return { + ...node, + selected: match, + }; + }); + // deactivating any active relationships + const updatedRelationships = relationships.map((rel) => { + return { + ...rel, + selected: false, + }; + }); + setNodes(updatedNodes); + setRelationships(updatedRelationships); }, [debouncedQuery]); const selectedItem = useMemo(() => { @@ -110,39 +133,6 @@ const SchemaViz: React.FunctionComponent = ({ return relationships.find((relationship) => relationship.id === selected.id); }, [selected, relationships, nodes]); - // The search and update nodes - const handleSearch = useCallback( - (value: string) => { - const query = value.toLowerCase(); - const updatedNodes = nodes.map((node) => { - if (query === '') { - return { - ...node, - selected: false, - size: graphLabels.nodeSize, - }; - } - const { id, properties, caption } = node; - const propertiesMatch = properties?.id?.toLowerCase().includes(query); - const match = id.toLowerCase().includes(query) || propertiesMatch || caption?.toLowerCase().includes(query); - return { - ...node, - selected: match, - }; - }); - // deactivating any active relationships - const updatedRelationships = relationships.map((rel) => { - return { - ...rel, - selected: false, - }; - }); - setNodes(updatedNodes); - setRelationships(updatedRelationships); - }, - [nodes, relationships] - ); - // Unmounting the component if (!open) { return <>; diff --git a/frontend/src/components/Popups/RetryConfirmation/Index.tsx b/frontend/src/components/Popups/RetryConfirmation/Index.tsx index bea07ca38..dde1c3385 100644 --- a/frontend/src/components/Popups/RetryConfirmation/Index.tsx +++ b/frontend/src/components/Popups/RetryConfirmation/Index.tsx @@ -25,7 +25,12 @@ function RetryConfirmationDialog({ }) { const { filesData, setFilesData } = useFileContext(); const file = filesData.find((c) => c.id === fileId); - const RetryOptionsForFile = file?.status != 'Completed' ? RETRY_OPIONS : RETRY_OPIONS.slice(0, 2); + const RetryOptionsForFile = + file?.status === 'Completed' + ? RETRY_OPIONS.filter( + (option) => option !== 'start_from_beginning' && option !== 'start_from_last_processed_position' + ) + : RETRY_OPIONS; return ( Reprocess Options