From c5972777e6350ba44c2a95aa67f90411a96528ec Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Tue, 15 Apr 2025 14:12:33 +0100 Subject: [PATCH 01/15] add json command --- hunting/__main__.py | 21 ++++++ hunting/json.py | 178 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 hunting/json.py diff --git a/hunting/__main__.py b/hunting/__main__.py index 4ce320566a0..ffc2c52a7c1 100644 --- a/hunting/__main__.py +++ b/hunting/__main__.py @@ -16,6 +16,7 @@ from .definitions import HUNTING_DIR from .markdown import MarkdownGenerator +from .json import JSONGenerator from .run import QueryRunner from .search import QueryIndex from .utils import (filter_elasticsearch_params, get_hunt_path, load_all_toml, @@ -51,6 +52,26 @@ def generate_markdown(path: Path = None): # After processing, update the index markdown_generator.update_index_md() +@hunting.command('generate-json') +@click.argument('path', required=False) +def generate_json(path: Path = None): + """Convert TOML hunting queries to JSON format.""" + json_generator = JSONGenerator(HUNTING_DIR) + + if path: + path = Path(path) + if path.is_file() and path.suffix == '.toml': + click.echo(f"Generating JSON for single file: {path}") + json_generator.process_file(path) + elif (HUNTING_DIR / path).is_dir(): + click.echo(f"Generating JSON for folder: {path}") + json_generator.process_folder(path) + else: + raise ValueError(f"Invalid path provided: {path}") + else: + click.echo("Generating JSON for all files.") + json_generator.process_all_files() + @hunting.command('refresh-index') def refresh_index(): diff --git a/hunting/json.py b/hunting/json.py new file mode 100644 index 00000000000..1e12b9f8612 --- /dev/null +++ b/hunting/json.py @@ -0,0 +1,178 @@ +from pathlib import Path +import click +from .definitions import ATLAS_URL, ATTACK_URL, STATIC_INTEGRATION_LINK_MAP, Hunt +from .utils import load_index_file, load_toml, save_index_file, validate_link + +class JSONGenerator: + """Class to generate or update JSON documentation from TOML or YAML files.""" + def __init__(self, base_path: Path): + """Initialize with the base path and load the hunting index.""" + self.base_path = base_path + self.hunting_index = load_index_file() + + def process_file(self, file_path: Path) -> None: + """Process a single TOML file and generate its JSON representation.""" + if not file_path.is_file() or file_path.suffix != '.toml': + raise ValueError(f"The provided path is not a valid TOML file: {file_path}") + + click.echo(f"Processing specific TOML file: {file_path}") + hunt_config = load_toml(file_path) + json_content = self.convert_toml_to_json(hunt_config, file_path) + + docs_folder = self.create_docs_folder(file_path) + json_path = docs_folder / f"{file_path.stem}.json" + self.save_json(json_path, json_content) + + self.update_or_add_entry(hunt_config, file_path) + + def process_folder(self, folder: str) -> None: + """Process all TOML files in a specified folder and generate their JSON representations.""" + folder_path = self.base_path / folder / "queries" + docs_folder = self.base_path / folder / "docs" + + if not folder_path.is_dir() or not docs_folder.is_dir(): + raise ValueError(f"Queries folder {folder_path} or docs folder {docs_folder} does not exist.") + + click.echo(f"Processing all TOML files in folder: {folder_path}") + toml_files = folder_path.rglob("*.toml") + + for toml_file in toml_files: + self.process_file(toml_file) + + def process_all_files(self) -> None: + """Process all TOML files in the base directory and subfolders.""" + click.echo("Processing all TOML files in the base directory and subfolders.") + toml_files = self.base_path.rglob("queries/*.toml") + + for toml_file in toml_files: + self.process_file(toml_file) + + def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: + """Convert a Hunt configuration to JSON format.""" + integration_links = self.generate_integration_links(hunt_config.integration) + integration_data = [] + + for integration in hunt_config.integration: + link = None + for link_entry in integration_links: + if integration in link_entry: + link = link_entry.split('(')[1].rstrip(')') + break + + integration_data.append({ + "name": integration, + "link": link + }) + + mitre_data = [] + if hunt_config.mitre: + for tech in hunt_config.mitre: + url = ATLAS_URL if tech.startswith('AML') else ATTACK_URL + if tech.startswith('T'): + url += tech.replace('.', '/') + else: + url += tech + + mitre_data.append({ + "technique": tech, + "url": url + }) + + json_data = { + "name": hunt_config.name, + "metadata": { + "author": hunt_config.author, + "description": hunt_config.description, + "uuid": hunt_config.uuid, + "integrations": integration_data, + "language": str(hunt_config.language).replace("'", "").replace('"', ""), + "source_file": { + "name": hunt_config.name, + "path": (Path('../queries') / file_path.name).as_posix() + } + }, + "queries": hunt_config.query, + "notes": hunt_config.notes if hunt_config.notes else [], + "mitre_techniques": mitre_data, + "references": hunt_config.references if hunt_config.references else [], + "license": hunt_config.license + } + + return json_data + + def save_json(self, json_path: Path, content: dict) -> None: + """Save the JSON content to a file.""" + import json + with open(json_path, 'w', encoding='utf-8') as f: + json.dump(content, f, indent=2, ensure_ascii=False) + click.echo(f"JSON generated: {json_path}") + + def update_or_add_entry(self, hunt_config: Hunt, toml_path: Path) -> None: + """Update or add the entry for a TOML file in the hunting index.""" + folder_name = toml_path.parent.parent.name + uuid = hunt_config.uuid + + entry = { + 'name': hunt_config.name, + 'path': f"./{toml_path.relative_to(self.base_path).as_posix()}", + 'mitre': hunt_config.mitre + } + + if folder_name not in self.hunting_index: + self.hunting_index[folder_name] = {uuid: entry} + else: + self.hunting_index[folder_name][uuid] = entry + + save_index_file(self.base_path, self.hunting_index) + + def create_docs_folder(self, file_path: Path) -> Path: + """Create the docs folder if it doesn't exist and return the path.""" + docs_folder = file_path.parent.parent / "docs" + docs_folder.mkdir(parents=True, exist_ok=True) + return docs_folder + + def generate_integration_links(self, integrations: list[str]) -> list[str]: + """Generate integration links for the documentation.""" + base_url = 'https://docs.elastic.co/integrations' + generated = [] + for integration in integrations: + if integration in STATIC_INTEGRATION_LINK_MAP: + link_str = STATIC_INTEGRATION_LINK_MAP[integration] + else: + link_str = integration.replace('.', '/') + link = f'{base_url}/{link_str}' + validate_link(link) + generated.append(f'[{integration}]({link})') + return generated + + def update_index_json(self) -> None: + """Update the index.json file based on the entries in index.yml.""" + import json + index_file = self.base_path / "index.yml" + + if not index_file.exists(): + click.echo(f"No index.yml found at {index_file}. Skipping index.json update.") + return + + index_json = {"categories": []} + + for folder, files in sorted(self.hunting_index.items()): + category = { + "name": folder, + "files": [] + } + + for file_info in sorted(files.values(), key=lambda x: x['name']): + json_path = file_info['path'].replace('queries', 'docs').replace('.toml', '.json') + category["files"].append({ + "name": file_info['name'], + "path": json_path, + "type": "ES|QL" + }) + + index_json["categories"].append(category) + + index_json_path = self.base_path / "index.json" + with open(index_json_path, 'w', encoding='utf-8') as f: + json.dump(index_json, f, indent=2, ensure_ascii=False) + click.echo(f"Index JSON updated at: {index_json_path}") \ No newline at end of file From 6e770f6239cdab645c08023f748b47d243f01944 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Tue, 15 Apr 2025 14:20:18 +0100 Subject: [PATCH 02/15] remove markdown specific files --- hunting/json.py | 51 +++++++++++-------------------------------------- 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index 1e12b9f8612..d24ae6420ff 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -19,8 +19,8 @@ def process_file(self, file_path: Path) -> None: hunt_config = load_toml(file_path) json_content = self.convert_toml_to_json(hunt_config, file_path) - docs_folder = self.create_docs_folder(file_path) - json_path = docs_folder / f"{file_path.stem}.json" + json_folder = self.create_json_folder(file_path) + json_path = json_folder / f"{file_path.stem}.json" self.save_json(json_path, json_content) self.update_or_add_entry(hunt_config, file_path) @@ -28,10 +28,10 @@ def process_file(self, file_path: Path) -> None: def process_folder(self, folder: str) -> None: """Process all TOML files in a specified folder and generate their JSON representations.""" folder_path = self.base_path / folder / "queries" - docs_folder = self.base_path / folder / "docs" + json_folder = self.base_path / folder / "docs" - if not folder_path.is_dir() or not docs_folder.is_dir(): - raise ValueError(f"Queries folder {folder_path} or docs folder {docs_folder} does not exist.") + if not folder_path.is_dir() or not json_folder.is_dir(): + raise ValueError(f"Queries folder {folder_path} or docs folder {json_folder} does not exist.") click.echo(f"Processing all TOML files in folder: {folder_path}") toml_files = folder_path.rglob("*.toml") @@ -49,42 +49,13 @@ def process_all_files(self) -> None: def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: """Convert a Hunt configuration to JSON format.""" - integration_links = self.generate_integration_links(hunt_config.integration) - integration_data = [] - - for integration in hunt_config.integration: - link = None - for link_entry in integration_links: - if integration in link_entry: - link = link_entry.split('(')[1].rstrip(')') - break - - integration_data.append({ - "name": integration, - "link": link - }) - - mitre_data = [] - if hunt_config.mitre: - for tech in hunt_config.mitre: - url = ATLAS_URL if tech.startswith('AML') else ATTACK_URL - if tech.startswith('T'): - url += tech.replace('.', '/') - else: - url += tech - - mitre_data.append({ - "technique": tech, - "url": url - }) - json_data = { "name": hunt_config.name, "metadata": { "author": hunt_config.author, "description": hunt_config.description, "uuid": hunt_config.uuid, - "integrations": integration_data, + "integration": hunt_config.integration, "language": str(hunt_config.language).replace("'", "").replace('"', ""), "source_file": { "name": hunt_config.name, @@ -93,7 +64,7 @@ def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: }, "queries": hunt_config.query, "notes": hunt_config.notes if hunt_config.notes else [], - "mitre_techniques": mitre_data, + "mitre_techniques": hunt_config.mitre, "references": hunt_config.references if hunt_config.references else [], "license": hunt_config.license } @@ -125,11 +96,11 @@ def update_or_add_entry(self, hunt_config: Hunt, toml_path: Path) -> None: save_index_file(self.base_path, self.hunting_index) - def create_docs_folder(self, file_path: Path) -> Path: + def create_json_folder(self, file_path: Path) -> Path: """Create the docs folder if it doesn't exist and return the path.""" - docs_folder = file_path.parent.parent / "docs" - docs_folder.mkdir(parents=True, exist_ok=True) - return docs_folder + json_folder = file_path.parent.parent / "json" + json_folder.mkdir(parents=True, exist_ok=True) + return json_folder def generate_integration_links(self, integrations: list[str]) -> list[str]: """Generate integration links for the documentation.""" From 7c42550bfe63d338607642f9e559c4e0cc614f68 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Tue, 15 Apr 2025 14:28:57 +0100 Subject: [PATCH 03/15] add json files to git ignore --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 2a9bf333790..fe3182a519b 100644 --- a/.gitignore +++ b/.gitignore @@ -115,3 +115,7 @@ exports/ ML-models/ surveys/ machine-learning/ + + +# hunting json output +hunting/*/json/*.json \ No newline at end of file From b39cf822574d37c688172eb27c8233e834b48bd3 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Tue, 15 Apr 2025 14:54:02 +0100 Subject: [PATCH 04/15] add license to the json file --- hunting/json.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hunting/json.py b/hunting/json.py index d24ae6420ff..8b35df2c9e5 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -1,3 +1,8 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + from pathlib import Path import click from .definitions import ATLAS_URL, ATTACK_URL, STATIC_INTEGRATION_LINK_MAP, Hunt From 24b5cb415b5f6997cbc9d1e022dd4bfcd3ca4b39 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Tue, 15 Apr 2025 15:12:04 +0100 Subject: [PATCH 05/15] remove unused code --- hunting/json.py | 68 +------------------------------------------------ 1 file changed, 1 insertion(+), 67 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index 8b35df2c9e5..40d76e60ad5 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -6,7 +6,7 @@ from pathlib import Path import click from .definitions import ATLAS_URL, ATTACK_URL, STATIC_INTEGRATION_LINK_MAP, Hunt -from .utils import load_index_file, load_toml, save_index_file, validate_link +from .utils import load_index_file, load_toml, validate_link class JSONGenerator: """Class to generate or update JSON documentation from TOML or YAML files.""" @@ -28,8 +28,6 @@ def process_file(self, file_path: Path) -> None: json_path = json_folder / f"{file_path.stem}.json" self.save_json(json_path, json_content) - self.update_or_add_entry(hunt_config, file_path) - def process_folder(self, folder: str) -> None: """Process all TOML files in a specified folder and generate their JSON representations.""" folder_path = self.base_path / folder / "queries" @@ -83,72 +81,8 @@ def save_json(self, json_path: Path, content: dict) -> None: json.dump(content, f, indent=2, ensure_ascii=False) click.echo(f"JSON generated: {json_path}") - def update_or_add_entry(self, hunt_config: Hunt, toml_path: Path) -> None: - """Update or add the entry for a TOML file in the hunting index.""" - folder_name = toml_path.parent.parent.name - uuid = hunt_config.uuid - - entry = { - 'name': hunt_config.name, - 'path': f"./{toml_path.relative_to(self.base_path).as_posix()}", - 'mitre': hunt_config.mitre - } - - if folder_name not in self.hunting_index: - self.hunting_index[folder_name] = {uuid: entry} - else: - self.hunting_index[folder_name][uuid] = entry - - save_index_file(self.base_path, self.hunting_index) - def create_json_folder(self, file_path: Path) -> Path: """Create the docs folder if it doesn't exist and return the path.""" json_folder = file_path.parent.parent / "json" json_folder.mkdir(parents=True, exist_ok=True) return json_folder - - def generate_integration_links(self, integrations: list[str]) -> list[str]: - """Generate integration links for the documentation.""" - base_url = 'https://docs.elastic.co/integrations' - generated = [] - for integration in integrations: - if integration in STATIC_INTEGRATION_LINK_MAP: - link_str = STATIC_INTEGRATION_LINK_MAP[integration] - else: - link_str = integration.replace('.', '/') - link = f'{base_url}/{link_str}' - validate_link(link) - generated.append(f'[{integration}]({link})') - return generated - - def update_index_json(self) -> None: - """Update the index.json file based on the entries in index.yml.""" - import json - index_file = self.base_path / "index.yml" - - if not index_file.exists(): - click.echo(f"No index.yml found at {index_file}. Skipping index.json update.") - return - - index_json = {"categories": []} - - for folder, files in sorted(self.hunting_index.items()): - category = { - "name": folder, - "files": [] - } - - for file_info in sorted(files.values(), key=lambda x: x['name']): - json_path = file_info['path'].replace('queries', 'docs').replace('.toml', '.json') - category["files"].append({ - "name": file_info['name'], - "path": json_path, - "type": "ES|QL" - }) - - index_json["categories"].append(category) - - index_json_path = self.base_path / "index.json" - with open(index_json_path, 'w', encoding='utf-8') as f: - json.dump(index_json, f, indent=2, ensure_ascii=False) - click.echo(f"Index JSON updated at: {index_json_path}") \ No newline at end of file From f537ef9e8990094764d9298bbdfe18a5145de308 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Tue, 15 Apr 2025 16:09:07 +0100 Subject: [PATCH 06/15] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 86441ff8818..49958ed78fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.0.5" +version = "1.0.6" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" From 38e9cda8055575d457bf39ab13f7fd708872292c Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 11:41:11 +0100 Subject: [PATCH 07/15] add upload command --- hunting/__main__.py | 7 + hunting/upload.py | 318 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 325 insertions(+) create mode 100644 hunting/upload.py diff --git a/hunting/__main__.py b/hunting/__main__.py index ffc2c52a7c1..46354adcffa 100644 --- a/hunting/__main__.py +++ b/hunting/__main__.py @@ -14,6 +14,7 @@ from detection_rules.misc import parse_user_config +from .upload import upload_data from .definitions import HUNTING_DIR from .markdown import MarkdownGenerator from .json import JSONGenerator @@ -28,6 +29,12 @@ def hunting(): """Commands for managing hunting queries and converting TOML to Markdown.""" pass +@hunting.command('upload') +def upload(): + """Upload hunting queries to Elasticsearch.""" + # This function is not implemented in the provided code. + upload_data() + @hunting.command('generate-markdown') @click.argument('path', required=False) diff --git a/hunting/upload.py b/hunting/upload.py new file mode 100644 index 00000000000..127bb1a9390 --- /dev/null +++ b/hunting/upload.py @@ -0,0 +1,318 @@ +import os +import json +import sys +from elasticsearch import Elasticsearch, helpers +from urllib.parse import urlparse + +# Configuration variables (modify these as needed) +ELASTICSEARCH_URL = "http://localhost:9200" # Your Elasticsearch URL +ELASTICSEARCH_USERNAME = "elastic" # Your Elasticsearch username +ELASTICSEARCH_PASSWORD = "changeme" # Your Elasticsearch password +ELASTICSEARCH_INDEX = "threat-hunting-queries" # Target index name +# Directory containing JSON files +DIRECTORY_PATH = "/Users/mark/dev/detection-rules/hunting" +MAPPING = { + "mappings": { + "properties": { + "author": { + "type": "keyword" + }, + "description": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "from": { + "type": "keyword" + }, + "index": { + "type": "keyword" + }, + "language": { + "type": "keyword" + }, + "license": { + "type": "keyword" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "note": { + "type": "text" + }, + "queries": { + "properties": { + "query": { + "type": "text" + }, + "indices": { + "type": "keyword" + } + } + }, + "references": { + "type": "keyword" + }, + "related_integrations": { + "properties": { + "package": { + "type": "keyword" + }, + "version": { + "type": "keyword" + } + } + }, + "required_fields": { + "properties": { + "ecs": { + "type": "boolean" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + } + } + }, + "risk_score": { + "type": "integer" + }, + "rule_id": { + "type": "keyword" + }, + "setup": { + "type": "text" + }, + "severity": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "threat": { + "properties": { + "framework": { + "type": "keyword" + }, + "tactic": { + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "reference": { + "type": "keyword" + } + } + }, + "technique": { + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "reference": { + "type": "keyword" + }, + "subtechnique": { + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "reference": { + "type": "keyword" + } + } + } + } + } + } + }, + "timestamp_override": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "version": { + "type": "integer" + } + } + }, + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1 + } + } +} + + +def find_json_files(directory): + """Recursively find all JSON files in the directory.""" + json_files = [] + for root, _, files in os.walk(directory): + for file in files: + if file.lower().endswith('.json'): + json_files.append(os.path.join(root, file)) + return json_files + + +def read_json_file(file_path): + """Read a JSON file and return its contents.""" + try: + with open(file_path, 'r', encoding='utf-8') as file: + return json.load(file) + except Exception as e: + print(f"Error reading file {file_path}: {e}") + return None + + +def generate_actions(json_files, index_name): + """Generate actions for the bulk API.""" + for file_path in json_files: + data = read_json_file(file_path) + if data: + # Handle both single documents and arrays of documents + if isinstance(data, list): + for item in data: + if isinstance(item, dict): + yield { + "_index": index_name, + "_source": item + } + elif isinstance(data, dict): + yield { + "_index": index_name, + "_source": data + } + + +def create_index_with_mapping(es_client): + """ + Create an Elasticsearch index with the specified mapping. + If the index already exists, it can optionally be deleted and recreated. + + Args: + es_client: Elasticsearch client instance + index_name (str): Name of the index to create + mapping (dict, optional): The mapping configuration. If None, a default mapping will be used. + You can replace this with your custom mapping. + + Returns: + bool: True if the index was created successfully, False otherwise + """ + try: + if es_client.indices.exists(index=ELASTICSEARCH_INDEX): + print(f"Index '{ELASTICSEARCH_INDEX}' already exists.") + return True + + # Create the index with the mapping + print(f"Creating index '{ELASTICSEARCH_INDEX}' with custom mapping...") + es_client.indices.create(index=ELASTICSEARCH_INDEX, body=MAPPING) + print(f"Index '{ELASTICSEARCH_INDEX}' created successfully.") + return True + + except Exception as e: + print(f"Error creating index with mapping: {e}") + return False + + +def upload_data(): + # Validate configuration + if not os.path.isdir(DIRECTORY_PATH): + print(f"Error: Directory '{DIRECTORY_PATH}' does not exist.") + sys.exit(1) + + # Parse URL to ensure it's valid + try: + parsed_url = urlparse(ELASTICSEARCH_URL) + if not parsed_url.scheme or not parsed_url.netloc: + raise ValueError("Invalid URL format") + except Exception as e: + print(f"Error: Invalid Elasticsearch URL: {e}") + sys.exit(1) + + # Find all JSON files + json_files = find_json_files(DIRECTORY_PATH) + if not json_files: + print(f"No JSON files found in '{DIRECTORY_PATH}'.") + sys.exit(0) + + print(f"Found {len(json_files)} JSON files to upload.") + + # Connect to Elasticsearch + try: + es = Elasticsearch( + ELASTICSEARCH_URL, + basic_auth=(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) + ) + + # Check if Elasticsearch is available + if not es.ping(): + raise ConnectionError("Could not connect to Elasticsearch") + + # Check if index exists, create if it doesn't + if not es.indices.exists(index=ELASTICSEARCH_INDEX): + print( + f"Index '{ELASTICSEARCH_INDEX}' does not exist. Creating it...") + es.indices.create(index=ELASTICSEARCH_INDEX) + + except Exception as e: + print(f"Error connecting to Elasticsearch: {e}") + sys.exit(1) + + # Create index with mapping + try: + create_index_with_mapping(es) + except Exception as e: + print(f"Error creating index with mapping: {e}") + sys.exit(1) + + # Upload documents using bulk API + try: + success, failed = 0, 0 + actions = generate_actions(json_files, ELASTICSEARCH_INDEX) + + for ok, result in helpers.streaming_bulk( + es, + actions, + max_retries=3, + yield_ok=True + ): + if ok: + success += 1 + else: + print(f"Error: {result['index']['error']}") + failed += 1 + + # Print progress every 100 documents + if (success + failed) % 100 == 0: + print(f"Progress: {success} succeeded, {failed} failed") + + + print( + f"Upload complete: {success} documents uploaded successfully, {failed} documents failed.") + + except Exception as e: + print(f"Error during bulk upload: {e}") + sys.exit(1) From 11c8838f40d26037402ca24ee3b3d97db42cbe62 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 11:41:25 +0100 Subject: [PATCH 08/15] extract source indices --- hunting/json.py | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/hunting/json.py b/hunting/json.py index 40d76e60ad5..de0a4e06476 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -65,7 +65,7 @@ def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: "path": (Path('../queries') / file_path.name).as_posix() } }, - "queries": hunt_config.query, + "queries": self.format_queries(hunt_config.query), "notes": hunt_config.notes if hunt_config.notes else [], "mitre_techniques": hunt_config.mitre, "references": hunt_config.references if hunt_config.references else [], @@ -73,6 +73,53 @@ def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: } return json_data + + @staticmethod + def extract_indices_from_esql(esql_query): + """ + Extract indices from an ESQL query. + + Args: + esql_query (str): The ESQL query. + + Returns: + list: A list of indices found in the query. + """ + # Normalize whitespace by removing extra spaces and newlines + normalized_query = ' '.join(esql_query.split()) + + # Check if the query starts with "from" + if not normalized_query.lower().startswith('from '): + return [] + + # Extract the part after "from" and before the first pipe (|) + from_part = normalized_query[5:].split('|', 1)[0].strip() + + # Split by commas if multiple indices are provided + indices = [index.strip() for index in from_part.split(',')] + + return indices + + def format_queries(self, queries: list[str]) -> list[dict]: + """ + Format the queries for JSON output. + + Args: + queries (list[str]): List of ESQL queries. + Returns: + list[dict]: List of dictionaries containing the query and its indices. + """ + formatted_queries = [] + + for query in queries: + formatted_queries.append({ + "query": query, + "indices": self.extract_indices_from_esql(query), + }) + + return formatted_queries + + def save_json(self, json_path: Path, content: dict) -> None: """Save the JSON content to a file.""" From ce5c78d7cfac7a2cbaf2db070a06346ad3a17c76 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 20:24:11 +0100 Subject: [PATCH 09/15] simoplify json conversion --- hunting/json.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index de0a4e06476..fb6eb5b8bcf 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -3,6 +3,8 @@ # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. +from dataclasses import asdict +import json from pathlib import Path import click from .definitions import ATLAS_URL, ATTACK_URL, STATIC_INTEGRATION_LINK_MAP, Hunt @@ -22,7 +24,7 @@ def process_file(self, file_path: Path) -> None: click.echo(f"Processing specific TOML file: {file_path}") hunt_config = load_toml(file_path) - json_content = self.convert_toml_to_json(hunt_config, file_path) + json_content = self.convert_toml_to_json(hunt_config) json_folder = self.create_json_folder(file_path) json_path = json_folder / f"{file_path.stem}.json" @@ -50,29 +52,12 @@ def process_all_files(self) -> None: for toml_file in toml_files: self.process_file(toml_file) - def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: + def convert_toml_to_json(self, hunt_config: Hunt) -> dict: """Convert a Hunt configuration to JSON format.""" - json_data = { - "name": hunt_config.name, - "metadata": { - "author": hunt_config.author, - "description": hunt_config.description, - "uuid": hunt_config.uuid, - "integration": hunt_config.integration, - "language": str(hunt_config.language).replace("'", "").replace('"', ""), - "source_file": { - "name": hunt_config.name, - "path": (Path('../queries') / file_path.name).as_posix() - } - }, - "queries": self.format_queries(hunt_config.query), - "notes": hunt_config.notes if hunt_config.notes else [], - "mitre_techniques": hunt_config.mitre, - "references": hunt_config.references if hunt_config.references else [], - "license": hunt_config.license - } - return json_data + hunt_config_dict = asdict(hunt_config) + hunt_config_dict["queries"] = self.format_queries(hunt_config_dict["queries"]) + return json.dumps(asdict(hunt_config), indent=4) @staticmethod def extract_indices_from_esql(esql_query): @@ -123,7 +108,6 @@ def format_queries(self, queries: list[str]) -> list[dict]: def save_json(self, json_path: Path, content: dict) -> None: """Save the JSON content to a file.""" - import json with open(json_path, 'w', encoding='utf-8') as f: json.dump(content, f, indent=2, ensure_ascii=False) click.echo(f"JSON generated: {json_path}") From 3ae77429d9248d246679ffac738f64b4efe09f36 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 20:24:11 +0100 Subject: [PATCH 10/15] simoplify json conversion --- hunting/json.py | 78 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index 40d76e60ad5..c1635086df4 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -3,10 +3,12 @@ # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. +from dataclasses import asdict +import json from pathlib import Path import click -from .definitions import ATLAS_URL, ATTACK_URL, STATIC_INTEGRATION_LINK_MAP, Hunt -from .utils import load_index_file, load_toml, validate_link +from .definitions import Hunt +from .utils import load_index_file, load_toml class JSONGenerator: """Class to generate or update JSON documentation from TOML or YAML files.""" @@ -22,7 +24,7 @@ def process_file(self, file_path: Path) -> None: click.echo(f"Processing specific TOML file: {file_path}") hunt_config = load_toml(file_path) - json_content = self.convert_toml_to_json(hunt_config, file_path) + json_content = self.convert_toml_to_json(hunt_config) json_folder = self.create_json_folder(file_path) json_path = json_folder / f"{file_path.stem}.json" @@ -50,33 +52,59 @@ def process_all_files(self) -> None: for toml_file in toml_files: self.process_file(toml_file) - def convert_toml_to_json(self, hunt_config: Hunt, file_path: Path) -> dict: + def convert_toml_to_json(self, hunt_config: Hunt) -> dict: """Convert a Hunt configuration to JSON format.""" - json_data = { - "name": hunt_config.name, - "metadata": { - "author": hunt_config.author, - "description": hunt_config.description, - "uuid": hunt_config.uuid, - "integration": hunt_config.integration, - "language": str(hunt_config.language).replace("'", "").replace('"', ""), - "source_file": { - "name": hunt_config.name, - "path": (Path('../queries') / file_path.name).as_posix() - } - }, - "queries": hunt_config.query, - "notes": hunt_config.notes if hunt_config.notes else [], - "mitre_techniques": hunt_config.mitre, - "references": hunt_config.references if hunt_config.references else [], - "license": hunt_config.license - } + return json.dumps(asdict(hunt_config), indent=4) + + @staticmethod + def extract_indices_from_esql(esql_query): + """ + Extract indices from an ESQL query. - return json_data + Args: + esql_query (str): The ESQL query. + + Returns: + list: A list of indices found in the query. + """ + # Normalize whitespace by removing extra spaces and newlines + normalized_query = ' '.join(esql_query.split()) + + # Check if the query starts with "from" + if not normalized_query.lower().startswith('from '): + return [] + + # Extract the part after "from" and before the first pipe (|) + from_part = normalized_query[5:].split('|', 1)[0].strip() + + # Split by commas if multiple indices are provided + indices = [index.strip() for index in from_part.split(',')] + + return indices + + def format_queries(self, queries: list[str]) -> list[dict]: + """ + Format the queries for JSON output. + + Args: + queries (list[str]): List of ESQL queries. + Returns: + list[dict]: List of dictionaries containing the query and its indices. + """ + formatted_queries = [] + + for query in queries: + formatted_queries.append({ + "query": query, + "indices": self.extract_indices_from_esql(query), + }) + + return formatted_queries + + def save_json(self, json_path: Path, content: dict) -> None: """Save the JSON content to a file.""" - import json with open(json_path, 'w', encoding='utf-8') as f: json.dump(content, f, indent=2, ensure_ascii=False) click.echo(f"JSON generated: {json_path}") From a1176227c9cf2ac9d7c8f88de5564e4e14cebd73 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 20:27:25 +0100 Subject: [PATCH 11/15] remove unused methods --- hunting/json.py | 47 ----------------------------------------------- 1 file changed, 47 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index c1635086df4..eb5a78cd5d8 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -55,53 +55,6 @@ def process_all_files(self) -> None: def convert_toml_to_json(self, hunt_config: Hunt) -> dict: """Convert a Hunt configuration to JSON format.""" return json.dumps(asdict(hunt_config), indent=4) - - @staticmethod - def extract_indices_from_esql(esql_query): - """ - Extract indices from an ESQL query. - - Args: - esql_query (str): The ESQL query. - - Returns: - list: A list of indices found in the query. - """ - # Normalize whitespace by removing extra spaces and newlines - normalized_query = ' '.join(esql_query.split()) - - # Check if the query starts with "from" - if not normalized_query.lower().startswith('from '): - return [] - - # Extract the part after "from" and before the first pipe (|) - from_part = normalized_query[5:].split('|', 1)[0].strip() - - # Split by commas if multiple indices are provided - indices = [index.strip() for index in from_part.split(',')] - - return indices - - def format_queries(self, queries: list[str]) -> list[dict]: - """ - Format the queries for JSON output. - - Args: - queries (list[str]): List of ESQL queries. - Returns: - list[dict]: List of dictionaries containing the query and its indices. - """ - formatted_queries = [] - - for query in queries: - formatted_queries.append({ - "query": query, - "indices": self.extract_indices_from_esql(query), - }) - - return formatted_queries - - def save_json(self, json_path: Path, content: dict) -> None: """Save the JSON content to a file.""" From 973d2e530051c131bc4006aa3aa9260aae6d5331 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 20:33:58 +0100 Subject: [PATCH 12/15] use correct write method --- hunting/json.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index eb5a78cd5d8..45c8622228e 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -52,14 +52,14 @@ def process_all_files(self) -> None: for toml_file in toml_files: self.process_file(toml_file) - def convert_toml_to_json(self, hunt_config: Hunt) -> dict: + def convert_toml_to_json(self, hunt_config: Hunt) -> str: """Convert a Hunt configuration to JSON format.""" return json.dumps(asdict(hunt_config), indent=4) - def save_json(self, json_path: Path, content: dict) -> None: + def save_json(self, json_path: Path, content: str) -> None: """Save the JSON content to a file.""" with open(json_path, 'w', encoding='utf-8') as f: - json.dump(content, f, indent=2, ensure_ascii=False) + f.write(content) click.echo(f"JSON generated: {json_path}") def create_json_folder(self, file_path: Path) -> Path: From 8397047b596f41f28e7cfc0eff0b908b1429bce6 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 20:41:58 +0100 Subject: [PATCH 13/15] fix queries --- hunting/json.py | 5 +- hunting/upload.py | 213 ++++++++++++++-------------------------------- 2 files changed, 69 insertions(+), 149 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index d09fe5fc33a..912d5da4896 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -54,7 +54,10 @@ def process_all_files(self) -> None: def convert_toml_to_json(self, hunt_config: Hunt) -> str: """Convert a Hunt configuration to JSON format.""" - return json.dumps(asdict(hunt_config), indent=4) + hunt_config_dict = asdict(hunt_config) + hunt_config_dict["queries"] = self.format_queries(hunt_config_dict["query"]) + hunt_config_dict.pop("query") + return json.dumps(hunt_config_dict, indent=4) @staticmethod def extract_indices_from_esql(esql_query): diff --git a/hunting/upload.py b/hunting/upload.py index 127bb1a9390..90bf9f5c453 100644 --- a/hunting/upload.py +++ b/hunting/upload.py @@ -12,158 +12,75 @@ # Directory containing JSON files DIRECTORY_PATH = "/Users/mark/dev/detection-rules/hunting" MAPPING = { - "mappings": { + "mappings": { + "properties": { + "author": { + "type": "keyword" + }, + "description": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "integration": { + "type": "keyword" + }, + "uuid": { + "type": "keyword" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "language": { + "type": "keyword" + }, + "license": { + "type": "keyword" + }, + "notes": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "mitre": { + "type": "keyword" + }, + "references": { + "type": "keyword" + }, + "queries": { + "type": "nested", "properties": { - "author": { - "type": "keyword" - }, - "description": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "from": { - "type": "keyword" - }, - "index": { - "type": "keyword" - }, - "language": { - "type": "keyword" - }, - "license": { - "type": "keyword" - }, - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "note": { - "type": "text" - }, - "queries": { - "properties": { - "query": { - "type": "text" - }, - "indices": { - "type": "keyword" - } - } - }, - "references": { - "type": "keyword" - }, - "related_integrations": { - "properties": { - "package": { - "type": "keyword" - }, - "version": { - "type": "keyword" - } - } - }, - "required_fields": { - "properties": { - "ecs": { - "type": "boolean" - }, - "name": { - "type": "keyword" - }, - "type": { - "type": "keyword" - } - } - }, - "risk_score": { - "type": "integer" - }, - "rule_id": { - "type": "keyword" - }, - "setup": { - "type": "text" - }, - "severity": { - "type": "keyword" - }, - "tags": { - "type": "keyword" - }, - "threat": { - "properties": { - "framework": { - "type": "keyword" - }, - "tactic": { - "properties": { - "id": { - "type": "keyword" - }, - "name": { - "type": "keyword" - }, - "reference": { - "type": "keyword" - } - } - }, - "technique": { - "properties": { - "id": { - "type": "keyword" - }, - "name": { - "type": "keyword" - }, - "reference": { - "type": "keyword" - }, - "subtechnique": { - "properties": { - "id": { - "type": "keyword" - }, - "name": { - "type": "keyword" - }, - "reference": { - "type": "keyword" - } - } - } - } - } - } - }, - "timestamp_override": { - "type": "keyword" - }, - "type": { - "type": "keyword" - }, - "version": { - "type": "integer" + "query": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 8192 + } } + }, + "indices": { + "type": "keyword" + } } - }, - "settings": { - "index": { - "number_of_shards": 1, - "number_of_replicas": 1 - } + } } + } } From c92183b0ab3567cabfb3495fc6d9fbe1aff538f2 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 16 Apr 2025 20:42:06 +0100 Subject: [PATCH 14/15] fux syntax error in query --- ...ccess_rapid_reset_password_requests_for_different_users.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml b/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml index f74cf1453ac..96f65e452d4 100644 --- a/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml +++ b/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml @@ -27,7 +27,7 @@ from logs-okta.system* // Count the number of reset password attempts for each user | stats user_count = count_distinct(user.target.full_name), - reset_counts = by okta.actor.alternate_id, source.user.full_name, okta.debug_context.debug_data.dt_hash + reset_counts = count(*) by okta.actor.alternate_id, source.user.full_name, okta.debug_context.debug_data.dt_hash // Filter for more than 10 unique users and more than 15 reset password attempts by the source | where user_count > 10 and reset_counts > 15 From 3beea5a006828558ea13e01035037fd5c5ac6dc4 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Wed, 23 Apr 2025 17:32:25 +0100 Subject: [PATCH 15/15] extract fields needed for kibana --- hunting/json.py | 86 ++++++++++++++++++++++++++++++++++++++++++++--- hunting/upload.py | 63 +++++++++++++++++++++++++--------- 2 files changed, 129 insertions(+), 20 deletions(-) diff --git a/hunting/json.py b/hunting/json.py index 912d5da4896..ae125be9c4a 100644 --- a/hunting/json.py +++ b/hunting/json.py @@ -4,12 +4,16 @@ # 2.0. from dataclasses import asdict +import datetime import json -from pathlib import Path +from pathlib import Path, PosixPath import click from .definitions import Hunt from .utils import load_index_file, load_toml +import re +now = datetime.datetime.now() +timestamp = now.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" class JSONGenerator: """Class to generate or update JSON documentation from TOML or YAML files.""" def __init__(self, base_path: Path): @@ -24,7 +28,7 @@ def process_file(self, file_path: Path) -> None: click.echo(f"Processing specific TOML file: {file_path}") hunt_config = load_toml(file_path) - json_content = self.convert_toml_to_json(hunt_config) + json_content = self.convert_toml_to_json(hunt_config, file_path) json_folder = self.create_json_folder(file_path) json_path = json_folder / f"{file_path.stem}.json" @@ -52,13 +56,42 @@ def process_all_files(self) -> None: for toml_file in toml_files: self.process_file(toml_file) - def convert_toml_to_json(self, hunt_config: Hunt) -> str: + def convert_toml_to_json(self, hunt_config: Hunt, path: str) -> str: """Convert a Hunt configuration to JSON format.""" hunt_config_dict = asdict(hunt_config) hunt_config_dict["queries"] = self.format_queries(hunt_config_dict["query"]) hunt_config_dict.pop("query") + hunt_config_dict["category"] = self.path_to_category(path) + hunt_config_dict["@timestamp"] = timestamp return json.dumps(hunt_config_dict, indent=4) + def path_to_category(self, path: PosixPath) -> str: + """ + Convert a file path to a category string. + + Args: + path (str): The file path. + + Returns: + str: The category string derived from the file path. + """ + # category is the direcory the queries are in + # e.g. "hunting/winodws/queries" -> "windows" + + # Get the path parts + parts = path.parts + # Check if the last part is "queries" + if "queries" in parts: + # Get the index of "queries" in the path + queries_index = parts.index("queries") + # If "queries" exists and there's a part before it, return that as the category + if queries_index > 0: + return parts[queries_index - 1] + + # Default fallback: return the parent directory name + return path.parent.name + + @staticmethod def extract_indices_from_esql(esql_query): """ @@ -70,6 +103,14 @@ def extract_indices_from_esql(esql_query): Returns: list: A list of indices found in the query. """ + # Handle SELECT statements that start with SELECT instead of FROM + if esql_query.strip().upper().startswith('SELECT'): + # Find the FROM keyword after SELECT + match = re.search(r'FROM\s+([^\s|,;\n]+)', esql_query, re.IGNORECASE) + if match: + return [match.group(1).strip()] + + # For queries that start with FROM directly # Normalize whitespace by removing extra spaces and newlines normalized_query = ' '.join(esql_query.split()) @@ -78,13 +119,49 @@ def extract_indices_from_esql(esql_query): return [] # Extract the part after "from" and before the first pipe (|) - from_part = normalized_query[5:].split('|', 1)[0].strip() + # First remove any inline comments with // + cleaned_query = re.sub(r'//.*$', '', normalized_query, flags=re.MULTILINE) + # Extract text after "from" keyword, then split by pipe, newline, or WHERE + from_part = cleaned_query[5:] # Skip the "from" prefix + # Find the first occurrence of pipe, newline, or "WHERE" (case insensitive) + pipe_pos = from_part.find('|') + newline_pos = from_part.find('\n') + where_pos = re.search(r'WHERE', from_part, re.IGNORECASE) + where_pos = where_pos.start() if where_pos else -1 + + # Find the earliest delimiter (pipe, newline, or WHERE) + positions = [pos for pos in [pipe_pos, newline_pos, where_pos] if pos >= 0] + end_pos = min(positions) if positions else len(from_part) + + from_part = from_part[:end_pos].strip() # Split by commas if multiple indices are provided indices = [index.strip() for index in from_part.split(',')] return indices + def remove_comments_and_blank_lines(self, esql_query): + """ + Remove comments and blank lines from an ESQL query. + + Args: + esql_query (str): The ESQL query. + + Returns: + str: The cleaned ESQL query. + """ + # Remove block comments (/* ... */) + cleaned_query = re.sub(r'/\*.*?\*/', '', esql_query, flags=re.DOTALL) + + # Remove line comments and blank lines + result = [] + for line in cleaned_query.splitlines(): + # Skip comment lines and blank lines + if not line.strip().startswith("//") and line.strip(): + result.append(line) + + return "\n".join(result) + def format_queries(self, queries: list[str]) -> list[dict]: """ Format the queries for JSON output. @@ -100,6 +177,7 @@ def format_queries(self, queries: list[str]) -> list[dict]: formatted_queries.append({ "query": query, "indices": self.extract_indices_from_esql(query), + "cleaned_query": self.remove_comments_and_blank_lines(query) }) return formatted_queries diff --git a/hunting/upload.py b/hunting/upload.py index 90bf9f5c453..de787a53e70 100644 --- a/hunting/upload.py +++ b/hunting/upload.py @@ -14,6 +14,12 @@ MAPPING = { "mappings": { "properties": { + "@timestamp": { + "type": "date" + }, + "category": { + "type": "keyword" + }, "author": { "type": "keyword" }, @@ -63,7 +69,6 @@ "type": "keyword" }, "queries": { - "type": "nested", "properties": { "query": { "type": "text", @@ -76,7 +81,16 @@ }, "indices": { "type": "keyword" - } + }, + "cleaned_query": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 8192 + } + } + }, } } } @@ -104,6 +118,22 @@ def read_json_file(file_path): return None +def validate_language(item): + if not isinstance(item, dict): + return False + if "language" not in item: + return True # No language field to validate + languages = item["language"] + if not isinstance(languages, list): + return False + + for lang in languages: + if lang.lower() != "es|ql": + return False + return True + + + def generate_actions(json_files, index_name): """Generate actions for the bulk API.""" for file_path in json_files: @@ -112,18 +142,25 @@ def generate_actions(json_files, index_name): # Handle both single documents and arrays of documents if isinstance(data, list): for item in data: - if isinstance(item, dict): - yield { - "_index": index_name, - "_source": item - } + # Validate the language field + if not validate_language(item): + print(f"Invalid language field in file: {file_path}") + continue + else: + yield { + "_index": index_name, + "_source": item + } elif isinstance(data, dict): + if not validate_language(data): + print(f"Invalid language field in file: {file_path}") + continue + else: yield { - "_index": index_name, - "_source": data + "_index": index_name, + "_source": data } - def create_index_with_mapping(es_client): """ Create an Elasticsearch index with the specified mapping. @@ -188,12 +225,6 @@ def upload_data(): if not es.ping(): raise ConnectionError("Could not connect to Elasticsearch") - # Check if index exists, create if it doesn't - if not es.indices.exists(index=ELASTICSEARCH_INDEX): - print( - f"Index '{ELASTICSEARCH_INDEX}' does not exist. Creating it...") - es.indices.create(index=ELASTICSEARCH_INDEX) - except Exception as e: print(f"Error connecting to Elasticsearch: {e}") sys.exit(1)