diff --git a/.github/workflows/python_run_tests.yml b/.github/workflows/python_run_tests.yml new file mode 100644 index 0000000..a3d8ca8 --- /dev/null +++ b/.github/workflows/python_run_tests.yml @@ -0,0 +1,33 @@ +name: Run Unit Test via Pytest + +on: + push: + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12", "3.13"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install poetry + poetry install --with dev + - name: Analysing the code with pylint + run: | + poetry run pylint $(git ls-files '*.py') + continue-on-error: true + - name: Test with pytest + run: | + poetry run coverage run -m pytest -v -s + - name: Generate Coverage Report + run: | + poetry run coverage report -m \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c41924..91485ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ ### Added - Signatures now loaded into memory instead of being saved to disk. This allows for running on read-only filesystems. - Tests for Docker build +- Enhanced deduplication of findings + - The same match should not be returned multiple times within the same scope. E.g. if a token is found in a commit, it should not be returned multiple times in the same commit. +- All dates are now converted and logged in UTC +- Unit tests added for models and utils ### Fixed - Error when searching wiki-blobs diff --git a/poetry.lock b/poetry.lock index 1b9fadf..cd53d8c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -416,6 +416,17 @@ autocompletion = ["argcomplete (>=1.10.0,<3)"] graphql = ["gql[httpx] (>=3.5.0,<4)"] yaml = ["PyYaml (>=6.0.1)"] +[[package]] +name = "pytz" +version = "2024.2" +description = "World timezone definitions, modern and historical" +optional = false +python-versions = "*" +files = [ + {file = "pytz-2024.2-py2.py3-none-any.whl", hash = "sha256:31c7c1817eb7fae7ca4b8c7ee50c72f93aa2dd863de768e1ef4245d426aa0725"}, + {file = "pytz-2024.2.tar.gz", hash = "sha256:2aa355083c50a0f93fa581709deac0c9ad65cca8a9e9beac660adcbd493c798a"}, +] + [[package]] name = "pyyaml" version = "6.0.2" @@ -566,4 +577,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "4abeed384a90d9c9ca4132272033774657adfa5d4fd7ee4bf61388ec19bc66fc" +content-hash = "c6cda1a7c9bd412030e28b547095fbbafb515d81f7204ed41d4167e46d0492b1" diff --git a/pyproject.toml b/pyproject.toml index 4a8846f..28ae6de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ colorama = "^0.4.6" pyyaml = "^6.0.2" requests = "^2.32.3" python-gitlab = "^5.0.0" +pytz = "^2024.2" [tool.poetry.group.dev.dependencies] pytest = "^8.3.3" diff --git a/src/gitlab_watchman/__init__.py b/src/gitlab_watchman/__init__.py index d5c5eac..267c1fd 100644 --- a/src/gitlab_watchman/__init__.py +++ b/src/gitlab_watchman/__init__.py @@ -1,23 +1,18 @@ import argparse import calendar +import datetime import multiprocessing import os import sys import time -import datetime import traceback +from dataclasses import dataclass from importlib import metadata from typing import List from gitlab_watchman import watchman_processor +from gitlab_watchman.clients.gitlab_client import GitLabAPIClient from gitlab_watchman.signature_downloader import SignatureDownloader -from gitlab_watchman.loggers import JSONLogger, StdoutLogger, log_to_csv -from gitlab_watchman.models import ( - signature, - user, - project, - group -) from gitlab_watchman.exceptions import ( GitLabWatchmanError, GitLabWatchmanGetObjectError, @@ -26,35 +21,55 @@ ElasticsearchMissingError, MissingEnvVarError ) -from gitlab_watchman.clients.gitlab_client import GitLabAPIClient +from gitlab_watchman.loggers import ( + JSONLogger, + StdoutLogger, + log_to_csv, + init_logger +) +from gitlab_watchman.models import ( + signature, + user, + project, + group +) + +@dataclass +class SearchArgs: + """ Dataclass to hold search arguments """ + gitlab_client: GitLabAPIClient + sig_list: List[signature.Signature] + timeframe: int + logging_type: str + log_handler: JSONLogger | StdoutLogger + debug: bool + verbose: bool + scopes: List[str] -def search(gitlab_connection: GitLabAPIClient, - sig: signature.Signature, - timeframe: int, - scope: str, - verbose: bool): + +def search(search_args: SearchArgs, sig: signature.Signature, scope: str): """ Use the appropriate search function to search GitLab based on the contents of the signature file. Output results to stdout Args: - gitlab_connection: GitLab API object + search_args: SearchArgs object sig: Signature object - timeframe: Timeframe to search for scope: What sort of GitLab objects to search - verbose: Whether to use verbose logging or not """ try: OUTPUT_LOGGER.log('INFO', f'Searching for {sig.name} in {scope}') results = watchman_processor.search( - gitlab=gitlab_connection, - log_handler=OUTPUT_LOGGER, + gitlab=search_args.gitlab_client, + logging_type=search_args.logging_type, + log_handler=search_args.log_handler, + debug=search_args.debug, sig=sig, scope=scope, - verbose=verbose, - timeframe=timeframe) + verbose=search_args.verbose, + timeframe=search_args.timeframe) if results: for log_data in results: OUTPUT_LOGGER.log( @@ -71,41 +86,18 @@ def search(gitlab_connection: GitLabAPIClient, raise e -def perform_search(gitlab_connection: GitLabAPIClient, - sig_list: List[signature.Signature], - timeframe: int, - verbose_logging: bool, - scopes: List[str]): +def perform_search(search_args: SearchArgs): """ Helper function to perform the search for each signature and each scope Args: - gitlab_connection: GitLab API object - sig_list: List of Signature objects - timeframe: Timeframe to search for - verbose_logging: Whether to use verbose logging or not - scopes: List of scopes to search + search_args: SearchArgs object """ - for sig in sig_list: + for sig in search_args.sig_list: if sig.scope: - for scope in scopes: + for scope in search_args.scopes: if scope in sig.scope: - search(gitlab_connection, sig, timeframe, scope, verbose_logging) - - -def init_logger(logging_type: str, debug: bool) -> JSONLogger | StdoutLogger: - """ Create a logger object. Defaults to stdout if no option is given - - Args: - logging_type: Type of logging to use - debug: Whether to use debug level logging or not - Returns: - Logger object - """ - - if not logging_type or logging_type == 'stdout': - return StdoutLogger(debug=debug) - return JSONLogger(debug=debug) + search(search_args, sig, scope) def validate_variables() -> bool: @@ -261,44 +253,62 @@ def main(): 'SUCCESS', f'Projects output to CSV file: {os.path.join(os.getcwd(), "gitlab_projects.csv")}') + search_args = SearchArgs( + gitlab_client=gitlab_client, + sig_list=signature_list, + timeframe=timeframe, + logging_type=logging_type, + log_handler=OUTPUT_LOGGER, + debug=debug, + verbose=verbose, + scopes=[]) + if everything: OUTPUT_LOGGER.log('INFO', 'Getting everything...') - perform_search(gitlab_client, signature_list, timeframe, verbose, - [ - 'blobs', - 'commits', - 'issues', - 'merge_requests', - 'wiki_blobs', - 'milestones', - 'notes', - 'snippet_titles' - ]) + search_args.scopes = [ + 'blobs', + 'commits', + 'issues', + 'merge_requests', + 'wiki_blobs', + 'milestones', + 'notes', + 'snippet_titles' + ] + perform_search(search_args) else: if blobs: OUTPUT_LOGGER.log('INFO', 'Searching blobs') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['blobs']) + search_args.scopes = ['blobs'] + perform_search(search_args) if commits: OUTPUT_LOGGER.log('INFO', 'Searching commits') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['commits']) + search_args.scopes = ['commits'] + perform_search(search_args) if issues: OUTPUT_LOGGER.log('INFO', 'Searching issues') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['issues']) + search_args.scopes = ['issues'] + perform_search(search_args) if merge: OUTPUT_LOGGER.log('INFO', 'Searching merge requests') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['merge_requests']) + search_args.scopes = ['merge_requests'] + perform_search(search_args) if wiki: OUTPUT_LOGGER.log('INFO', 'Searching wiki blobs') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['wiki_blobs']) + search_args.scopes = ['wiki_blobs'] + perform_search(search_args) if milestones: OUTPUT_LOGGER.log('INFO', 'Searching milestones') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['milestones']) + search_args.scopes = ['milestones'] + perform_search(search_args) if notes: OUTPUT_LOGGER.log('INFO', 'Searching notes') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['notes']) + search_args.scopes = ['notes'] + perform_search(search_args) if snippets: OUTPUT_LOGGER.log('INFO', 'Searching snippets') - perform_search(gitlab_client, signature_list, timeframe, verbose, ['snippet_titles']) + search_args.scopes = ['snippet_titles'] + perform_search(search_args) OUTPUT_LOGGER.log('SUCCESS', f'GitLab Watchman finished execution - Execution time:' f' {str(datetime.timedelta(seconds=time.time() - start_time))}') diff --git a/src/gitlab_watchman/loggers.py b/src/gitlab_watchman/loggers.py index 238c326..6a753e6 100644 --- a/src/gitlab_watchman/loggers.py +++ b/src/gitlab_watchman/loggers.py @@ -12,6 +12,8 @@ from typing import Any, Dict, List, ClassVar, Protocol from colorama import Fore, Back, Style, init +from gitlab_watchman.utils import EnhancedJSONEncoder + class StdoutLogger: def __init__(self, **kwargs): @@ -102,9 +104,11 @@ def log(self, f' -----' elif scope == 'wiki_blobs': if message.get('project_wiki'): - wiki_path = f'{message.get("project").get("web_url")}/-/wikis/{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}' + wiki_path = (f'{message.get("project").get("web_url")}/-/wikis/' + f'{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}') elif message.get('group_wiki'): - wiki_path = f'{message.get("group").get("web_url")}/-/wikis/{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}' + wiki_path = (f'{message.get("group").get("web_url")}/-/wikis/' + f'{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}') else: wiki_path = 'N/A' @@ -260,13 +264,6 @@ def print_header(self) -> None: print(' '.ljust(79) + Fore.GREEN) -class EnhancedJSONEncoder(json.JSONEncoder): - def default(self, o): - if dataclasses.is_dataclass(o): - return dataclasses.asdict(o) - return super().default(o) - - class JSONLogger(Logger): def __init__(self, name: str = 'gitlab_watchman', **kwargs): super().__init__(name) @@ -349,3 +346,18 @@ def log_to_csv(csv_name: str, export_data: List[IsDataclass]) -> None: f.close() except Exception as e: print(e) + + +def init_logger(logging_type: str, debug: bool) -> JSONLogger | StdoutLogger: + """ Create a logger object. Defaults to stdout if no option is given + + Args: + logging_type: Type of logging to use + debug: Whether to use debug level logging or not + Returns: + Logger object + """ + + if not logging_type or logging_type == 'stdout': + return StdoutLogger(debug=debug) + return JSONLogger(debug=debug) \ No newline at end of file diff --git a/src/gitlab_watchman/models/commit.py b/src/gitlab_watchman/models/commit.py index 3d97583..9b0d9e8 100644 --- a/src/gitlab_watchman/models/commit.py +++ b/src/gitlab_watchman/models/commit.py @@ -1,4 +1,7 @@ from dataclasses import dataclass +from datetime import datetime + +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -6,15 +9,15 @@ class Commit(object): """ Class that defines File objects for GitLab files""" id: str - created_at: str + created_at: datetime | None title: str message: str author_name: str author_email: str - authored_date: str + authored_date: datetime | None committer_name: str committer_email: str - committed_date: str + committed_date: datetime | None web_url: str status: str project_id: str @@ -26,19 +29,19 @@ def create_from_dict(commit_dict: dict) -> Commit: Args: commit_dict: dict/JSON format data from GitLab API Returns: - A new Note object + A new Commit object """ return Commit( id=commit_dict.get('id'), - created_at=commit_dict.get('created_at'), + created_at=convert_to_utc_datetime(commit_dict.get('created_at')), title=commit_dict.get('title'), message=commit_dict.get('message'), author_name=commit_dict.get('author_name'), author_email=commit_dict.get('author_email'), - authored_date=commit_dict.get('authored_date'), + authored_date=convert_to_utc_datetime(commit_dict.get('authored_date')), committer_name=commit_dict.get('committer_name'), - committed_date=commit_dict.get('committed_date'), + committed_date=convert_to_utc_datetime(commit_dict.get('committed_date')), committer_email=commit_dict.get('committer_email'), web_url=commit_dict.get('web_url'), status=commit_dict.get('status'), diff --git a/src/gitlab_watchman/models/group.py b/src/gitlab_watchman/models/group.py index 7c143a6..e6c3046 100644 --- a/src/gitlab_watchman/models/group.py +++ b/src/gitlab_watchman/models/group.py @@ -1,4 +1,7 @@ from dataclasses import dataclass +from datetime import datetime + +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -17,7 +20,7 @@ class Group(object): request_access_enabled: bool full_name: str full_path: str - created_at: str + created_at: datetime | None web_url: str ip_restriction_ranges: str @@ -28,7 +31,7 @@ def create_from_dict(group_dict: dict) -> Group: Args: group_dict: dict/JSON format data from GitLab API Returns: - A new Project object + A new Group object """ return Group( @@ -44,7 +47,7 @@ def create_from_dict(group_dict: dict) -> Group: request_access_enabled=group_dict.get('request_access_enabled'), full_name=group_dict.get('full_name'), full_path=group_dict.get('full_path'), - created_at=group_dict.get('created_at'), + created_at=convert_to_utc_datetime(group_dict.get('created_at')), web_url=group_dict.get('web_url'), ip_restriction_ranges=group_dict.get('ip_restriction_ranges') ) diff --git a/src/gitlab_watchman/models/issue.py b/src/gitlab_watchman/models/issue.py index 2f2ec8c..ce49a22 100644 --- a/src/gitlab_watchman/models/issue.py +++ b/src/gitlab_watchman/models/issue.py @@ -1,6 +1,8 @@ from dataclasses import dataclass +from datetime import datetime from gitlab_watchman.models import user +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -13,10 +15,10 @@ class Issue(object): title: str description: str state: str - created_at: str - updated_at: str - closed_by: user.User - closed_at: str + created_at: datetime | None + updated_at: datetime | None + closed_by: user.User | None + closed_at: datetime | None author: str type: str author: user.User @@ -37,6 +39,11 @@ def create_from_dict(issue_dict: dict) -> Issue: else: closed_by = None + if issue_dict.get('author'): + author = user.create_from_dict(issue_dict.get('author')) + else: + author = None + return Issue( id=issue_dict.get('id'), iid=issue_dict.get('iid'), @@ -44,12 +51,12 @@ def create_from_dict(issue_dict: dict) -> Issue: title=issue_dict.get('title'), description=issue_dict.get('description'), state=issue_dict.get('state'), - created_at=issue_dict.get('created_at'), - updated_at=issue_dict.get('updated_at'), + created_at=convert_to_utc_datetime(issue_dict.get('created_at')), + updated_at=convert_to_utc_datetime(issue_dict.get('updated_at')), closed_by=closed_by, - closed_at=issue_dict.get('closed_at'), + closed_at=convert_to_utc_datetime(issue_dict.get('closed_at')), type=issue_dict.get('type'), - author=user.create_from_dict(issue_dict.get('author')), + author=author, confidential=issue_dict.get('confidential'), web_url=issue_dict.get('web_url'), ) diff --git a/src/gitlab_watchman/models/merge_request.py b/src/gitlab_watchman/models/merge_request.py index 77ff4bf..8d4509d 100644 --- a/src/gitlab_watchman/models/merge_request.py +++ b/src/gitlab_watchman/models/merge_request.py @@ -1,6 +1,8 @@ from dataclasses import dataclass +from datetime import datetime from gitlab_watchman.models import user +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -13,13 +15,13 @@ class MergeRequest(object): title: str description: str state: str - created_at: str - updated_at: str + created_at: datetime | None + updated_at: datetime | None merged_by: user.User - merged_at: str + merged_at: datetime | None target_branch: str source_branch: str - author: user.User + author: user.User | None source_project_id: str target_project_id: str merge_status: str @@ -39,6 +41,11 @@ def create_from_dict(mr_dict: dict) -> MergeRequest: else: merged_by = None + if mr_dict.get('author'): + author = user.create_from_dict(mr_dict.get('author')) + else: + author = None + return MergeRequest( id=mr_dict.get('id'), iid=mr_dict.get('iid'), @@ -46,13 +53,13 @@ def create_from_dict(mr_dict: dict) -> MergeRequest: title=mr_dict.get('title'), description=mr_dict.get('description'), state=mr_dict.get('state'), - created_at=mr_dict.get('created_at'), - updated_at=mr_dict.get('updated_at'), + created_at=convert_to_utc_datetime(mr_dict.get('created_at')), + updated_at=convert_to_utc_datetime(mr_dict.get('updated_at')), merged_by=merged_by, - merged_at=mr_dict.get('merged_at'), + merged_at=convert_to_utc_datetime(mr_dict.get('merged_at')), target_branch=mr_dict.get('target_branch'), source_branch=mr_dict.get('source_branch'), - author=user.create_from_dict(mr_dict.get('author')), + author=author, source_project_id=mr_dict.get('source_project_id'), target_project_id=mr_dict.get('target_project_id'), merge_status=mr_dict.get('merge_status'), diff --git a/src/gitlab_watchman/models/milestone.py b/src/gitlab_watchman/models/milestone.py index 79410c6..1f03cd7 100644 --- a/src/gitlab_watchman/models/milestone.py +++ b/src/gitlab_watchman/models/milestone.py @@ -1,4 +1,7 @@ from dataclasses import dataclass +from datetime import datetime + +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -11,10 +14,10 @@ class Milestone(object): title: str description: str state: str - created_at: str - updated_at: str - due_date: str - start_date: str + created_at: datetime | None + updated_at: datetime | None + due_date: datetime | None + start_date: datetime | None expired: str web_url: str @@ -34,10 +37,10 @@ def create_from_dict(milestone_dict: dict) -> Milestone: title=milestone_dict.get('title'), description=milestone_dict.get('description'), state=milestone_dict.get('state'), - created_at=milestone_dict.get('created_at'), - updated_at=milestone_dict.get('updated_at'), - due_date=milestone_dict.get('due_date'), - start_date=milestone_dict.get('start_date'), + created_at=convert_to_utc_datetime(milestone_dict.get('created_at')), + updated_at=convert_to_utc_datetime(milestone_dict.get('updated_at')), + due_date=convert_to_utc_datetime(milestone_dict.get('due_date')), + start_date=convert_to_utc_datetime(milestone_dict.get('start_date')), expired=milestone_dict.get('expired'), web_url=milestone_dict.get('web_url'), project_id=milestone_dict.get('project_id') diff --git a/src/gitlab_watchman/models/note.py b/src/gitlab_watchman/models/note.py index dce2f22..5815670 100644 --- a/src/gitlab_watchman/models/note.py +++ b/src/gitlab_watchman/models/note.py @@ -1,6 +1,8 @@ from dataclasses import dataclass +from datetime import datetime from gitlab_watchman.models import user +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -12,15 +14,15 @@ class Note(object): body: str attachment: str or bool author: user.User - created_at: str - updated_at: str + created_at: datetime | None + updated_at: datetime | None system: str noteable_id: str noteable_type: str commit_id: str resolvable: bool resolved_by: user.User - resolved_at: str + resolved_at: datetime | None confidential: str noteable_iid: str command_changes: str @@ -39,21 +41,26 @@ def create_from_dict(note_dict: dict) -> Note: else: resolved_by = None + if note_dict.get('author'): + author = user.create_from_dict(note_dict.get('author', {})) + else: + author = None + return Note( id=note_dict.get('id'), type=note_dict.get('type'), body=note_dict.get('body'), attachment=note_dict.get('attachment'), - author=user.create_from_dict(note_dict.get('author', {})), - created_at=note_dict.get('created_at'), - updated_at=note_dict.get('updated_at'), + author=author, + created_at=convert_to_utc_datetime(note_dict.get('created_at')), + updated_at=convert_to_utc_datetime(note_dict.get('updated_at')), system=note_dict.get('system'), noteable_id=note_dict.get('noteable_id'), noteable_type=note_dict.get('noteable_type'), commit_id=note_dict.get('commit_id'), resolvable=note_dict.get('resolvable'), resolved_by=resolved_by, - resolved_at=note_dict.get('resolved_at'), + resolved_at=convert_to_utc_datetime(note_dict.get('resolved_at')), confidential=note_dict.get('confidential'), noteable_iid=note_dict.get('noteable_iid'), command_changes=note_dict.get('command_changes'), diff --git a/src/gitlab_watchman/models/project.py b/src/gitlab_watchman/models/project.py index 9e60a0d..ee43365 100644 --- a/src/gitlab_watchman/models/project.py +++ b/src/gitlab_watchman/models/project.py @@ -1,7 +1,9 @@ +import datetime from dataclasses import dataclass from typing import List from gitlab_watchman.models import user +from gitlab_watchman.utils import convert_to_utc_datetime @dataclass(slots=True) @@ -27,9 +29,9 @@ class Project(object): name_with_namespace: str path: str path_with_namespace: str - created_at: str + created_at: datetime.datetime | None web_url: user.User - last_activity_at: str + last_activity_at: datetime.datetime | None namespace: Namespace @@ -48,19 +50,19 @@ def create_from_dict(project_dict: dict) -> Project: name=project_dict.get('name'), name_with_namespace=project_dict.get('name_with_namespace'), path=project_dict.get('path'), - created_at=project_dict.get('created_at'), + created_at=convert_to_utc_datetime(project_dict.get('created_at')), path_with_namespace=project_dict.get('path_with_namespace'), web_url=project_dict.get('web_url'), - last_activity_at=project_dict.get('last_activity_at'), + last_activity_at=convert_to_utc_datetime(project_dict.get('last_activity_at')), namespace=Namespace( - id=project_dict.get('namespace').get('id'), - name=project_dict.get('namespace').get('name'), - path=project_dict.get('namespace').get('path'), - kind=project_dict.get('namespace').get('kind'), - full_path=project_dict.get('namespace').get('full_path'), - parent_id=project_dict.get('namespace').get('parent_id'), - web_url=project_dict.get('namespace').get('web_url'), + id=project_dict.get('namespace', {}).get('id'), + name=project_dict.get('namespace', {}).get('name'), + path=project_dict.get('namespace', {}).get('path'), + kind=project_dict.get('namespace', {}).get('kind'), + full_path=project_dict.get('namespace', {}).get('full_path'), + parent_id=project_dict.get('namespace', {}).get('parent_id'), + web_url=project_dict.get('namespace', {}).get('web_url'), members=[], - owner=[] + owner=None ) ) diff --git a/src/gitlab_watchman/models/signature.py b/src/gitlab_watchman/models/signature.py index f4bc430..471d3d3 100644 --- a/src/gitlab_watchman/models/signature.py +++ b/src/gitlab_watchman/models/signature.py @@ -1,70 +1,59 @@ +import datetime from typing import Any, Dict from dataclasses import dataclass +from typing import List @dataclass(slots=True) -class Signature(object): +class TestCases(object): + match_cases: list + fail_cases: list + + +@dataclass(frozen=True, slots=True) +class Signature: """ Class that handles loaded signature objects. Signatures - define what to search for in GitLab and where to search for it. + define what to search for in Slack and where to search for it. They also contain regex patterns to validate data that is found""" name: str - status: bool + status: str author: str - date: str + date: str | datetime.date | datetime.datetime version: str description: str - severity: int - watchman_apps: list - scope: list - test_cases: dataclass - search_strings: str - patterns: str - - -@dataclass(slots=True) -class TestCases(object): - match_cases: list - fail_cases: list + severity: int or str + watchman_apps: Dict[str, Any] + scope: List[str] + test_cases: TestCases + search_strings: List[str] + patterns: List[str] + def __post_init__(self): + if self.name and not isinstance(self.name, str): + raise TypeError(f'Expected `name` to be of type str, received {type(self.name).__name__}') + if self.status and not isinstance(self.status, str): + raise TypeError(f'Expected `status` to be of type str, received {type(self.status).__name__}') + if self.author and not isinstance(self.author, str): + raise TypeError(f'Expected `author` to be of type str, received {type(self.author).__name__}') + if self.date and not (isinstance(self.date, datetime.date) + or isinstance(self.date, str) + or isinstance(self.date, datetime.datetime)): + raise TypeError(f'Expected `date` to be of type str, received {type(self.date).__name__}') + if self.version and not isinstance(self.version, str): + raise TypeError(f'Expected `version` to be of type str, received {type(self.version).__name__}') + if self.description and not isinstance(self.description, str): + raise TypeError(f'Expected `description` to be of type str, received {type(self.description).__name__}') + if self.severity and not (isinstance(self.severity, int) or isinstance(self.severity, str)): + raise TypeError(f'Expected `severity` to be of type int or str, received {type(self.severity).__name__}') + if self.scope and not isinstance(self.scope, list): + raise TypeError(f'Expected `scope` to be of type list, received {type(self.scope).__name__}') + if self.search_strings and not isinstance(self.search_strings, list): + raise TypeError( + f'Expected `search_strings` to be of type list, received {type(self.search_strings).__name__}') + if self.patterns and not isinstance(self.patterns, list): + raise TypeError(f'Expected `patterns` to be of type list, received {type(self.patterns).__name__}') -# def load_from_yaml(sig_path: pathlib.PosixPath) -> list[Signature]: -# """Load YAML file and return a Signature object -# -# Args: -# sig_path: Path of YAML file -# Returns: -# Signature object with fields populated from the YAML -# signature file -# """ -# -# with open(sig_path) as yaml_file: -# yaml_import = yaml.safe_load(yaml_file) -# -# output = [] -# for sig in yaml_import.get('signatures'): -# if 'gitlab' in sig.get('watchman_apps'): -# output.append( -# Signature( -# name=sig.get('name'), -# status=sig.get('status'), -# author=sig.get('author'), -# date=sig.get('date'), -# version=sig.get('version'), -# description=sig.get('description'), -# severity=sig.get('severity'), -# watchman_apps=sig.get('watchman_apps'), -# scope=sig.get('watchman_apps').get('gitlab').get('scope'), -# test_cases=TestCases( -# match_cases=sig.get('test_cases').get('match_cases'), -# fail_cases=sig.get('test_cases').get('fail_cases') -# ), -# search_strings=sig.get('watchman_apps').get('gitlab').get('search_strings'), -# patterns=sig.get('patterns') -# ) -# ) -# -# return output def create_from_dict(signature_dict: Dict[str, Any]) -> Signature: """ Create a Signature object from a dictionary @@ -84,11 +73,10 @@ def create_from_dict(signature_dict: Dict[str, Any]) -> Signature: description=signature_dict.get('description'), severity=signature_dict.get('severity'), watchman_apps=signature_dict.get('watchman_apps'), - scope=signature_dict.get('watchman_apps').get('gitlab').get('scope'), + scope=signature_dict.get('watchman_apps', {}).get('gitlab', {}).get('scope'), test_cases=TestCases( - match_cases=signature_dict.get('test_cases').get('match_cases'), - fail_cases=signature_dict.get('test_cases').get('fail_cases') + match_cases=signature_dict.get('test_cases', {}).get('match_cases'), + fail_cases=signature_dict.get('test_cases', {}).get('fail_cases') ), - search_strings=signature_dict.get('watchman_apps').get('gitlab').get('search_strings'), - patterns=signature_dict.get('patterns') - ) \ No newline at end of file + search_strings=signature_dict.get('watchman_apps', {}).get('gitlab', {}).get('search_strings'), + patterns=signature_dict.get('patterns')) \ No newline at end of file diff --git a/src/gitlab_watchman/models/snippet.py b/src/gitlab_watchman/models/snippet.py index 00a940e..eb9eff8 100644 --- a/src/gitlab_watchman/models/snippet.py +++ b/src/gitlab_watchman/models/snippet.py @@ -1,6 +1,15 @@ from dataclasses import dataclass +from datetime import datetime +from typing import List from gitlab_watchman.models import user +from gitlab_watchman.utils import convert_to_utc_datetime + + +@dataclass(slots=True) +class File(object): + path: str + raw_url: str @dataclass(slots=True) @@ -11,19 +20,12 @@ class Snippet(object): title: str description: str visibility: str or bool - created_at: str - updated_at: str + created_at: datetime | None + updated_at: datetime | None web_url: str author: user.User file_name: str - files: list - - -@dataclass(slots=True) -class File(object): - - path: str - raw_url: str + files: List[File] def create_from_dict(snip_dict: dict) -> Snippet: @@ -44,14 +46,19 @@ def create_from_dict(snip_dict: dict) -> Snippet: else: file_list = None + if snip_dict.get('author'): + author = user.create_from_dict(snip_dict.get('author')) + else: + author = None + return Snippet( id=snip_dict.get('id'), title=snip_dict.get('title'), description=snip_dict.get('description'), visibility=snip_dict.get('visibility'), - author=user.create_from_dict(snip_dict.get('author', {})), - created_at=snip_dict.get('created_at'), - updated_at=snip_dict.get('updated_at'), + author=author, + created_at=convert_to_utc_datetime(snip_dict.get('created_at')), + updated_at=convert_to_utc_datetime(snip_dict.get('updated_at')), web_url=snip_dict.get('web_url'), file_name=snip_dict.get('file_name'), files=file_list diff --git a/src/gitlab_watchman/utils.py b/src/gitlab_watchman/utils.py index ea40f62..51839a7 100644 --- a/src/gitlab_watchman/utils.py +++ b/src/gitlab_watchman/utils.py @@ -1,26 +1,90 @@ -import time import json import dataclasses -from typing import List, Dict +from datetime import datetime +from typing import List, Dict, Any + +import pytz class EnhancedJSONEncoder(json.JSONEncoder): def default(self, o): + if isinstance(o, datetime): + return o.isoformat() if dataclasses.is_dataclass(o): return dataclasses.asdict(o) return super().default(o) -def convert_time(timestamp: str) -> int: - """Convert ISO 8601 timestamp to epoch """ +def convert_to_epoch(timestamp: str | datetime) -> int | None: + """ Convert ISO 8601 formatted strings to int epoch timestamps + + ISO 8601 formatted strings are formatted as: + YYYY-MM-DDTHH:MM:SS.SSS+HH:MM + + Args: + timestamp: ISO 8601 formatted string, example: 2024-01-01T00:00:00.000+00:00 + Returns: + int epoch timestamp + """ + + try: + if isinstance(timestamp, datetime): + return int(timestamp.timestamp()) + else: + pattern = '%Y-%m-%dT%H:%M:%S.%f%z' + return int(datetime.strptime(timestamp, pattern).timestamp()) + except TypeError: + return None + + +def convert_to_utc_datetime(timestamp: str) -> datetime | None: + """ Convert ISO 8601 formatted strings to datetime objects. + Datetimes are returned in UTC. + Accepted inputs: + ISO 8601 Datetime with Timezone: YYYY-MM-DDTHH:MM:SS.SSS+HH:MM + ISO 8601 Datetime without Timezone: YYYY-MM-DDTHH:MM:SS.SSSZ + ISO 8601 Date: YYYY-MM-DD + + Args: + timestamp: ISO 8601 formatted string + Returns: + datetime object + """ + + try: + try: + dt = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%f%z') + return dt.astimezone(pytz.utc) + except ValueError: + try: + dt = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') + return dt.astimezone(pytz.utc) + except ValueError: + return datetime.strptime(timestamp, '%Y-%m-%d') + except TypeError: + return None - pattern = '%Y-%m-%dT%H:%M:%S.%f%z' - return int(time.mktime(time.strptime(timestamp, pattern))) +def convert_to_dict(obj: Any) -> Dict: + """ Returns a dictionary object from a dataclass object or a dict + containing nested dataclass objects. -def deduplicate_results(input_list: List[Dict]) -> List[Dict]: - """ Removes duplicates where results are returned by multiple queries - Nested class handles JSON encoding for dataclass objects + Args: + obj: dataclass object or dict + Returns: + Dictionary object + """ + + json_object = json.dumps(obj, sort_keys=True, cls=EnhancedJSONEncoder) + return json.loads(json_object) + + +def deduplicate_results(input_list: List[Any]) -> List[Dict]: + """ Removes duplicates where results are returned by multiple queries. This is done + using the `watchman_id` field in the detection data to identify the same findings. + + The `watchman_id` is a hash that is generated for each finding from the match string, + meaning the same message won't be returned multiple times. Args: input_list: List of dataclass objects @@ -28,12 +92,11 @@ def deduplicate_results(input_list: List[Dict]) -> List[Dict]: List of JSON objects with duplicates removed """ - json_set = {json.dumps(dictionary, sort_keys=True, cls=EnhancedJSONEncoder) for dictionary in input_list} - - return [json.loads(t) for t in json_set] + converted_dict_list = [convert_to_dict(t) for t in input_list] + return list({match.get('watchman_id'): match for match in reversed(converted_dict_list)}.values()) def split_to_chunks(input_list, no_of_chunks): """Split the input list into n amount of chunks""" - return (input_list[i::no_of_chunks] for i in range(no_of_chunks)) \ No newline at end of file + return (input_list[i::no_of_chunks] for i in range(no_of_chunks)) diff --git a/src/gitlab_watchman/watchman_processor.py b/src/gitlab_watchman/watchman_processor.py index 2ae616d..3ea9333 100644 --- a/src/gitlab_watchman/watchman_processor.py +++ b/src/gitlab_watchman/watchman_processor.py @@ -3,14 +3,16 @@ import re import time import traceback +import hashlib +from multiprocessing import Queue from dataclasses import dataclass -from typing import List, Dict +from typing import List, Dict, Optional from requests.exceptions import SSLError from gitlab_watchman.clients.gitlab_client import GitLabAPIClient from gitlab_watchman.exceptions import GitLabWatchmanGetObjectError, GitLabWatchmanAuthenticationError -from gitlab_watchman.loggers import JSONLogger, StdoutLogger +from gitlab_watchman.loggers import JSONLogger, StdoutLogger, init_logger from gitlab_watchman.models import ( signature, note, @@ -26,7 +28,11 @@ project, group ) -from gitlab_watchman.utils import convert_time, deduplicate_results, split_to_chunks +from gitlab_watchman.utils import ( + convert_to_epoch, + deduplicate_results, + split_to_chunks +) ALL_TIME = calendar.timegm(time.gmtime()) + 1576800000 @@ -34,14 +40,14 @@ @dataclass class WorkerArgs: """ Dataclass for multiprocessing arguments """ - gitlab_client: GitLabAPIClient search_result_list: List[Dict] regex: re.Pattern[str] timeframe: int results_list: List[Dict] verbose: bool - log_handler: JSONLogger | StdoutLogger + log_queue: Optional[Queue] = None + log_handler: Optional[JSONLogger | StdoutLogger] = None def initiate_gitlab_connection(token: str, @@ -84,102 +90,103 @@ def find_group_owners(group_members: List[Dict]) -> List[Dict]: return member_list -def find_user_owner(user_list: List[Dict]) -> List[Dict]: - """ Return user who owns a namespace - - Args: - user_list: List of users - Returns: - List of formatted users owning a namespace - """ - - owner_list = [] - for user in user_list: - owner_list.append({ - 'user_id': user.get('id'), - 'name': user.get('name'), - 'username': user.get('username'), - 'state': user.get('state') - }) - - return owner_list +def log_listener(log_queue: Queue, logging_type: str, debug: bool): + log_handler = init_logger(logging_type, debug) + while True: + record = log_queue.get() + if record is None: + break + level, message = record + log_handler.log(level, message) def search(gitlab: GitLabAPIClient, - log_handler: StdoutLogger | JSONLogger, + logging_type: str, + log_handler: JSONLogger | StdoutLogger, + debug: bool, sig: signature.Signature, scope: str, verbose: bool, timeframe: int = ALL_TIME) -> List[Dict] | None: - """ Uses the Search API to get search results for the given scope. These results are then split into (No of cores - - 1) number of chunks, and Multiprocessing is then used to concurrently filter them against Regex using the relevant - worker function - - Args: - gitlab: GitLab API object - log_handler: Logger object for outputting results - sig: Signature object - scope: What sort of GitLab objects to search - verbose: Whether to use verbose logging or not - timeframe: Timeframe to search in - Returns: - List of JSON formatted results if any are found - """ - results = [] + if logging_type == 'json': + log_queue = Queue() + log_process = multiprocessing.Process(target=log_listener, args=(log_queue, logging_type, debug)) + log_process.start() + for query in sig.search_strings: for pattern in sig.patterns: regex = re.compile(pattern) search_results = gitlab.global_search(query, search_scope=scope) query_formatted = query.replace('"', '') - log_handler.log('INFO', - f'{len(search_results)} {scope} found matching search term: {query_formatted}') - result = multiprocessing.Manager().list() - - chunks = multiprocessing.cpu_count() - 1 - list_of_chunks = split_to_chunks(search_results, chunks) - - processes = [] - - target_func_dict = { - 'blobs': _blob_worker, - 'wiki_blobs': _wiki_blob_worker, - 'commits': _commit_worker, - 'snippet_titles': _snippet_worker, - 'issues': _issue_worker, - 'milestones': _milestone_worker, - 'merge_requests': _merge_request_worker, - 'notes': _note_worker, - } - target_func = target_func_dict.get(scope, _blob_worker) - - for search_list in list_of_chunks: - multipro_args = WorkerArgs( - gitlab_client=gitlab, - search_result_list=search_list, - regex=regex, - timeframe=timeframe, - results_list=result, - verbose=verbose, - log_handler=log_handler - ) - p = multiprocessing.Process(target=target_func, - args=(multipro_args,)) - processes.append(p) - p.start() - - for process in processes: - process.join() - - results.append(list(result)) + if search_results: + if logging_type == 'json': + log_queue.put(('INFO', f'{len(search_results)} {scope} found matching search term: {query_formatted}')) + else: + log_handler.log('INFO', f'{len(search_results)} {scope} found matching search term: {query_formatted}') + result = multiprocessing.Manager().list() + + chunks = multiprocessing.cpu_count() - 1 + list_of_chunks = split_to_chunks(search_results, chunks) + + processes = [] + + target_func_dict = { + 'blobs': _blob_worker, + 'wiki_blobs': _wiki_blob_worker, + 'commits': _commit_worker, + 'snippet_titles': _snippet_worker, + 'issues': _issue_worker, + 'milestones': _milestone_worker, + 'merge_requests': _merge_request_worker, + 'notes': _note_worker, + } + target_func = target_func_dict.get(scope, _blob_worker) + + for search_list in list_of_chunks: + multipro_args = WorkerArgs( + gitlab_client=gitlab, + search_result_list=search_list, + regex=regex, + timeframe=timeframe, + results_list=result, + verbose=verbose + ) + if logging_type == 'json': + multipro_args.log_queue = log_queue + else: + multipro_args.log_handler = log_handler + p = multiprocessing.Process(target=target_func, args=(multipro_args,)) + processes.append(p) + p.start() + + for process in processes: + process.join() + + results.append(list(result)) + else: + if logging_type == 'json': + log_queue.put(('INFO', f'No {scope} found matching search term: {query_formatted}')) + else: + log_handler.log('INFO', f'No {scope} found matching search term: {query_formatted}') if results: results = deduplicate_results([item for sublist in results for item in sublist]) - log_handler.log('INFO', f'{len(results)} total matches found after filtering') + if logging_type == 'json': + log_queue.put(('INFO', f'{len(results)} total matches found after filtering')) + log_queue.put(None) + log_process.join() + else: + log_handler.log('INFO', f'{len(results)} total matches found after filtering') return results else: - log_handler.log('INFO', 'No matches found after filtering') + if logging_type == 'json': + log_queue.put(('INFO', 'No matches found after filtering')) + log_queue.put(None) + log_process.join() + else: + log_handler.log('INFO', 'No matches found after filtering') def _populate_project_owners(gitlab: GitLabAPIClient, @@ -224,30 +231,36 @@ def _blob_worker(args: WorkerArgs) -> List[Dict]: """ now = calendar.timegm(time.gmtime()) - for b in args.search_result_list: + for blob_dict in args.search_result_list: try: - blob_object = blob.create_from_dict(b) + blob_object = blob.create_from_dict(blob_dict) project_object = project.create_from_dict(args.gitlab_client.get_project(blob_object.project_id)) file_object = file.create_from_dict( args.gitlab_client.get_file(blob_object.project_id, blob_object.path, blob_object.ref)) if file_object: commit_object = commit.create_from_dict( args.gitlab_client.get_commit(blob_object.project_id, file_object.commit_id)) - if convert_time(commit_object.committed_date) > (now - args.timeframe) and args.regex.search( + if convert_to_epoch(commit_object.committed_date) > (now - args.timeframe) and args.regex.search( str(blob_object.data)): match_string = args.regex.search(str(blob_object.data)).group(0) if not args.verbose: setattr(blob_object, 'data', None) + watchman_id = hashlib.md5(f'{match_string}.{file_object.file_path}'.encode()).hexdigest() args.results_list.append({ 'match_string': match_string, 'blob': blob_object, 'commit': commit_object, 'project': _populate_project_owners(args.gitlab_client, project_object), - 'file': file_object + 'file': file_object, + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -255,23 +268,23 @@ def _wiki_blob_worker(args: WorkerArgs) -> List[Dict]: """ MULTIPROCESSING WORKER - Iterates through a list of wiki_blobs to find matches against the regex. Args: - args (WorkerArgs): Multiprocessing arguments containing the + args: Multiprocessing arguments containing the GitLab client, search list, regex pattern, timeframe, results list, verbosity flag, and log handler. Returns: List[Dict]: Multiprocessing list to be combined by the parent process. """ - for wb in args.search_result_list: + for wb_dict in args.search_result_list: try: - wikiblob_object = wiki_blob.create_from_dict(wb) + wikiblob_object = wiki_blob.create_from_dict(wb_dict) project_wiki = False group_wiki = False - if wb.get('project_id'): - project_object = project.create_from_dict(args.gitlab_client.get_project(wb.get('project_id'))) + if wb_dict.get('project_id'): + project_object = project.create_from_dict(args.gitlab_client.get_project(wb_dict.get('project_id'))) project_wiki = True - if wb.get('group_id'): - group_object = group.create_from_dict(args.gitlab_client.get_group(wb.get('group_id'))) + if wb_dict.get('group_id'): + group_object = group.create_from_dict(args.gitlab_client.get_group(wb_dict.get('group_id'))) group_wiki = True if args.regex.search( @@ -279,11 +292,13 @@ def _wiki_blob_worker(args: WorkerArgs) -> List[Dict]: match_string = args.regex.search(str(wikiblob_object.data)).group(0) if not args.verbose: setattr(wikiblob_object, 'data', None) + watchman_id = hashlib.md5(f'{match_string}.{wikiblob_object.path}'.encode()).hexdigest() results_dict = { 'match_string': match_string, 'wiki_blob': wikiblob_object, 'group_wiki': group_wiki, 'project_wiki': project_wiki, + 'watchman_id': watchman_id } if project_wiki: results_dict['project'] = _populate_project_owners(args.gitlab_client, project_object) @@ -291,8 +306,12 @@ def _wiki_blob_worker(args: WorkerArgs) -> List[Dict]: results_dict['group'] = group_object args.results_list.append(results_dict) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -309,20 +328,27 @@ def _commit_worker(args: WorkerArgs) -> List[Dict]: now = calendar.timegm(time.gmtime()) - for c in args.search_result_list: + for commit_dict in args.search_result_list: try: - commit_object = commit.create_from_dict(c) - if convert_time(commit_object.committed_date) > (now - args.timeframe) and \ + commit_object = commit.create_from_dict(commit_dict) + if convert_to_epoch(commit_object.committed_date) > (now - args.timeframe) and \ args.regex.search(str(commit_object.message)): project_object = project.create_from_dict(args.gitlab_client.get_project(commit_object.project_id)) + match_string = args.regex.search(str(commit_object.message)).group(0) + watchman_id = hashlib.md5(f'{match_string}.{commit_object.id}'.encode()).hexdigest() args.results_list.append({ - 'match_string': args.regex.search(str(commit_object.message)).group(0), + 'match_string': match_string, 'commit': commit_object, - 'project': _populate_project_owners(args.gitlab_client, project_object) + 'project': _populate_project_owners(args.gitlab_client, project_object), + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -338,23 +364,29 @@ def _issue_worker(args: WorkerArgs) -> List[Dict]: """ now = calendar.timegm(time.gmtime()) - for i in args.search_result_list: + for issue_dict in args.search_result_list: try: - issue_object = issue.create_from_dict(i) - if convert_time(issue_object.updated_at) > (now - args.timeframe) and \ + issue_object = issue.create_from_dict(issue_dict) + if convert_to_epoch(issue_object.updated_at) > (now - args.timeframe) and \ args.regex.search(str(issue_object.description)): match_string = args.regex.search(str(issue_object.description)).group(0) if not args.verbose: setattr(issue_object, 'description', None) project_object = project.create_from_dict(args.gitlab_client.get_project(issue_object.project_id)) + watchman_id = hashlib.md5(f'{match_string}.{issue_object.id}'.encode()).hexdigest() args.results_list.append({ 'match_string': match_string, 'issue': issue_object, - 'project': _populate_project_owners(args.gitlab_client, project_object) + 'project': _populate_project_owners(args.gitlab_client, project_object), + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -368,23 +400,29 @@ def _milestone_worker(args: WorkerArgs) -> List[Dict]: """ now = calendar.timegm(time.gmtime()) - for m in args.search_result_list: + for milestone_dict in args.search_result_list: try: - milestone_object = milestone.create_from_dict(m) - if convert_time(milestone_object.updated_at) > (now - args.timeframe) and \ + milestone_object = milestone.create_from_dict(milestone_dict) + if convert_to_epoch(milestone_object.updated_at) > (now - args.timeframe) and \ args.regex.search(str(milestone_object.description)): project_object = project.create_from_dict(args.gitlab_client.get_project(milestone_object.project_id)) match_string = args.regex.search(str(milestone_object.description)).group(0) if not args.verbose: setattr(milestone_object, 'description', None) + watchman_id = hashlib.md5(f'{match_string}.{milestone_object.id}'.encode()).hexdigest() args.results_list.append({ 'match_string': match_string, 'milestone': milestone_object, - 'project': _populate_project_owners(args.gitlab_client, project_object) + 'project': _populate_project_owners(args.gitlab_client, project_object), + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -400,23 +438,29 @@ def _merge_request_worker(args: WorkerArgs) -> List[Dict]: """ now = calendar.timegm(time.gmtime()) - for mr in args.search_result_list: + for mr_dict in args.search_result_list: try: - mr_object = merge_request.create_from_dict(mr) - if convert_time(mr_object.updated_at) > (now - args.timeframe) and \ + mr_object = merge_request.create_from_dict(mr_dict) + if convert_to_epoch(mr_object.updated_at) > (now - args.timeframe) and \ args.regex.search(str(mr_object.description)): project_object = project.create_from_dict(args.gitlab_client.get_project(mr_object.project_id)) match_string = args.regex.search(str(mr_object.description)).group(0) if not args.verbose: setattr(mr_object, 'description', None) + watchman_id = hashlib.md5(f'{match_string}.{mr_object.id}'.encode()).hexdigest() args.results_list.append({ 'match_string': match_string, 'merge_request': mr_object, - 'project': _populate_project_owners(args.gitlab_client, project_object) + 'project': _populate_project_owners(args.gitlab_client, project_object), + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -433,18 +477,24 @@ def _note_worker(args: WorkerArgs) -> List[Dict]: now = calendar.timegm(time.gmtime()) try: - for n in args.search_result_list: - note_object = note.create_from_dict(n) - if convert_time(note_object.created_at) > (now - args.timeframe) and \ + for note_dict in args.search_result_list: + note_object = note.create_from_dict(note_dict) + if convert_to_epoch(note_object.created_at) > (now - args.timeframe) and \ args.regex.search(str(note_object.body)): match_string = args.regex.search(str(note_object.body)).group(0) + watchman_id = hashlib.md5(f'{match_string}.{note_object.id}'.encode()).hexdigest() args.results_list.append({ 'note': note_object, - 'match_string': match_string + 'match_string': match_string, + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list @@ -463,20 +513,25 @@ def _snippet_worker(args: WorkerArgs) -> List[Dict]: for snippet_dict in args.search_result_list: try: snippet_object = snippet.create_from_dict(snippet_dict) - if convert_time(snippet_object.created_at) > (now - args.timeframe) and \ + if convert_to_epoch(snippet_object.created_at) > (now - args.timeframe) and \ (args.regex.search(str(snippet_object.title)) or args.regex.search(str(snippet_object.description))): if args.regex.search(str(snippet_object.title)): match_string = args.regex.search(str(snippet_object.title)).group(0) else: match_string = args.regex.search(str(snippet_object.description)).group(0) - if not args.verbose: setattr(snippet_object, 'description', None) + watchman_id = hashlib.md5(f'{match_string}.{snippet_object.id}'.encode()).hexdigest() args.results_list.append({ 'snippet': snippet_object, - 'match_string': match_string + 'match_string': match_string, + 'watchman_id': watchman_id }) except GitLabWatchmanGetObjectError as e: - args.log_handler.log('WARNING', e) - args.log_handler.log('DEBUG', traceback.format_exc()) + if args.log_handler: + args.log_handler.log('WARNING', e) + args.log_handler.log('DEBUG', traceback.format_exc()) + else: + args.log_queue.put('WARNING', e) + args.log_queue.put('DEBUG', traceback.format_exc()) return args.results_list diff --git a/tests/unit/models/fixtures.py b/tests/unit/models/fixtures.py new file mode 100644 index 0000000..827ee6e --- /dev/null +++ b/tests/unit/models/fixtures.py @@ -0,0 +1,571 @@ +import pytest + +from gitlab_watchman.models import ( + commit, + blob, + file, + group, + issue, + merge_request, + milestone, + note, + project, + snippet, + user, + wiki_blob, + signature +) + + +class GitLabMockData: + """ Holds mock data for GitLab API responses.""" + + MOCK_COMMIT_DICT = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "short_id": "ed899a2f4b5", + "title": "Replace sanitize with escape once", + "author_name": "Example User", + "author_email": "user@example.com", + "authored_date": "2021-09-20T11:50:22.001+00:00", + "committer_name": "Administrator", + "committer_email": "admin@example.com", + "committed_date": "2021-09-20T11:50:22.001+00:00", + "created_at": "2021-09-20T11:50:22.001+00:00", + "message": "Replace sanitize with escape once", + "parent_ids": ["6104942438c14ec7bd21c6cd5bd995272b3faff6"], + "web_url": "https://gitlab.example.com/janedoe/gitlab-foss/-/commit/" + "ed899a2f4b50b4370feeea94676502b42383c746", + "trailers": {}, + "extended_trailers": {} + } + + MOCK_BLOB_DICT = { + "basename": "README", + "data": "```\n\n## Installation\n\nQuick start using the [pre-built", + "path": "README.md", + "filename": "README.md", + "id": None, + "ref": "main", + "startline": 46, + "project_id": 6 + } + + MOCK_FILE_DICT = { + "file_name": "key.rb", + "file_path": "app/models/key.rb", + "size": 1476, + "encoding": "base64", + "content": "IyA9PSBTY2hlbWEgSW5mb3...", + "content_sha256": "4c294617b60715c1d218e61164a3abd4808a4284cbc30e6728a01ad9aada4481", + "ref": "main", + "blob_id": "79f7bbd25901e8334750839545a9bd021f0e4c83", + "commit_id": "d5a3ff139356ce33e37e73add446f16869741b50", + "last_commit_id": "570e7b2abdd848b95f2f578043fc23bd6f6fd24d", + "execute_filemode": False + } + + MOCK_GROUP_DICT = { + "id": 4, + "name": "Twitter", + "path": "twitter", + "description": "Aliquid qui quis dignissimos distinctio ut commodi voluptas est.", + "visibility": "public", + "avatar_url": None, + "web_url": "https://gitlab.example.com/groups/twitter", + "request_access_enabled": False, + "repository_storage": "default", + "full_name": "Twitter", + "full_path": "twitter", + "runners_token": "ba324ca7b1c77fc20bb9", + "file_template_project_id": 1, + "parent_id": None, + "enabled_git_access_protocol": "all", + "created_at": "2020-01-15T12:36:29.590Z", + "shared_with_groups": [ + { + "group_id": 28, + "group_name": "H5bp", + "group_full_path": "h5bp", + "group_access_level": 20, + "expires_at": None + } + ], + "prevent_sharing_groups_outside_hierarchy": False, + "ip_restriction_ranges": None, + "math_rendering_limits_enabled": None, + "lock_math_rendering_limits_enabled": None + } + + MOCK_ISSUE_DICT = { + "id": 83, + "iid": 1, + "project_id": 12, + "title": "Add file", + "description": "Add first file", + "state": "opened", + "created_at": "2018-01-24T06:02:15.514Z", + "updated_at": "2018-02-06T12:36:23.263Z", + "closed_at": '2018-02-06T12:36:23.263Z', + "closed_by": { + "id": 1, + "name": "Administrator", + "username": "root", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/e64c7d89f26bd1972efa854d13d7dd61?s=80&d=identicon", + "web_url": "http://localhost:3000/root" + }, + "description_html": None, + "description_text": "Add first file", + "labels": [], + "milestone": None, + "assignees": [{ + "id": 20, + "name": "Ceola Deckow", + "username": "sammy.collier", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/c23d85a4f50e0ea76ab739156c639231?s=80&d=identicon", + "web_url": "http://localhost:3000/sammy.collier" + }], + "author": { + "id": 1, + "name": "Administrator", + "username": "root", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/e64c7d89f26bd1972efa854d13d7dd61?s=80&d=identicon", + "web_url": "http://localhost:3000/root" + }, + "assignee": { + "id": 20, + "name": "Ceola Deckow", + "username": "sammy.collier", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/c23d85a4f50e0ea76ab739156c639231?s=80&d=identicon", + "web_url": "http://localhost:3000/sammy.collier" + }, + "user_notes_count": 0, + "upvotes": 0, + "downvotes": 0, + "due_date": None, + "confidential": False, + "discussion_locked": None, + "web_url": "http://localhost:3000/h5bp/7bp/subgroup-prj/issues/1", + "time_stats": { + "time_estimate": 0, + "total_time_spent": 0, + "human_time_estimate": None, + "human_total_time_spent": None + } + } + + MOCK_MERGE_REQUEST_DICT = { + "id": 56, + "iid": 8, + "project_id": 6, + "title": "Add first file", + "description": "This is a test MR to add file", + "state": "opened", + "created_at": "2018-01-22T14:21:50.830Z", + "updated_at": "2018-02-06T12:40:33.295Z", + "target_branch": "main", + "source_branch": "jaja-test", + "upvotes": 0, + "downvotes": 0, + "author": { + "id": 1, + "name": "Administrator", + "username": "root", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/e64c7d89f26bd1972efa854d13d7dd61?s=80&d=identicon", + "web_url": "http://localhost:3000/root" + }, + "assignee": { + "id": 5, + "name": "Jacquelyn Kutch", + "username": "abigail", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/3138c66095ee4bd11a508c2f7f7772da?s=80&d=identicon", + "web_url": "http://localhost:3000/abigail" + }, + "source_project_id": 6, + "target_project_id": 6, + "labels": [ + "ruby", + "tests" + ], + "draft": False, + "work_in_progress": False, + "milestone": { + "id": 13, + "iid": 3, + "project_id": 6, + "title": "v2.0", + "description": "Qui aut qui eos dolor beatae itaque tempore molestiae.", + "state": "active", + "created_at": "2017-09-05T07:58:29.099Z", + "updated_at": "2017-09-05T07:58:29.099Z", + "due_date": None, + "start_date": None + }, + "merge_when_pipeline_succeeds": False, + "merge_status": "can_be_merged", + "sha": "78765a2d5e0a43585945c58e61ba2f822e4d090b", + "merge_commit_sha": None, + "squash_commit_sha": None, + "user_notes_count": 0, + "discussion_locked": None, + "should_remove_source_branch": None, + "force_remove_source_branch": True, + "web_url": "http://localhost:3000/twitter/flight/merge_requests/8", + "time_stats": { + "time_estimate": 0, + "total_time_spent": 0, + "human_time_estimate": None, + "human_total_time_spent": None + } + } + + MOCK_MILESTONE_DICT = { + "id": 44, + "iid": 1, + "project_id": 12, + "title": "next release", + "description": "Next release milestone", + "state": "active", + "created_at": "2018-02-06T12:43:39.271Z", + "updated_at": "2018-02-06T12:44:01.298Z", + "due_date": "2018-04-18", + "start_date": "2018-02-04" + } + + MOCK_NOTE_DICT = { + "id": 191, + "body": "Harum maxime consequuntur et et deleniti assumenda facilis.", + "attachment": None, + "author": { + "id": 23, + "name": "User 1", + "username": "user1", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/111d68d06e2d317b5a59c2c6c5bad808?s=80&d=identicon", + "web_url": "http://localhost:3000/user1" + }, + "created_at": "2017-09-05T08:01:32.068Z", + "updated_at": "2017-09-05T08:01:32.068Z", + "system": None, + "noteable_id": 22, + "noteable_type": "Issue", + "project_id": 6, + "noteable_iid": 2 + } + + MOCK_PROJECT_DICT = { + "id": 3, + "description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.", + "description_html": "

Lorem ipsum dolor sit amet, consectetur adipiscing elit.

", + "default_branch": "main", + "visibility": "private", + "ssh_url_to_repo": "git@example.com:diaspora/diaspora-project-site.git", + "http_url_to_repo": "http://example.com/diaspora/diaspora-project-site.git", + "web_url": "http://example.com/diaspora/diaspora-project-site", + "readme_url": "http://example.com/diaspora/diaspora-project-site/blob/main/README.md", + "tag_list": [ + "example", + "disapora project" + ], + "topics": [ + "example", + "disapora project" + ], + "owner": { + "id": 3, + "name": "Diaspora", + "created_at": "2013-09-30T13:46:02Z" + }, + "name": "Diaspora Project Site", + "name_with_namespace": "Diaspora / Diaspora Project Site", + "path": "diaspora-project-site", + "path_with_namespace": "diaspora/diaspora-project-site", + "issues_enabled": True, + "open_issues_count": 1, + "merge_requests_enabled": True, + "jobs_enabled": True, + "wiki_enabled": True, + "snippets_enabled": False, + "can_create_merge_request_in": True, + "resolve_outdated_diff_discussions": False, + "container_registry_enabled": False, + "container_registry_access_level": "disabled", + "security_and_compliance_access_level": "disabled", + "container_expiration_policy": { + "cadence": "7d", + "enabled": False, + "keep_n": None, + "older_than": None, + "name_regex": None, + "name_regex_delete": None, + "name_regex_keep": None, + "next_run_at": "2020-01-07T21:42:58.658Z" + }, + "created_at": "2013-09-30T13:46:02Z", + "updated_at": "2013-09-30T13:46:02Z", + "last_activity_at": "2013-09-30T13:46:02Z", + "creator_id": 3, + "namespace": { + "id": 3, + "name": "Diaspora", + "path": "diaspora", + "kind": "group", + "full_path": "diaspora", + "avatar_url": "http://localhost:3000/uploads/group/avatar/3/foo.jpg", + "web_url": "http://localhost:3000/groups/diaspora" + }, + "import_url": None, + "import_type": None, + "import_status": "none", + "import_error": None, + "permissions": { + "project_access": { + "access_level": 10, + "notification_level": 3 + }, + "group_access": { + "access_level": 50, + "notification_level": 3 + } + }, + "archived": False, + "avatar_url": "http://example.com/uploads/project/avatar/3/uploads/avatar.png", + "license_url": "http://example.com/diaspora/diaspora-client/blob/main/LICENSE", + "license": { + "key": "lgpl-3.0", + "name": "GNU Lesser General Public License v3.0", + "nickname": "GNU LGPLv3", + "html_url": "http://choosealicense.com/licenses/lgpl-3.0/", + "source_url": "http://www.gnu.org/licenses/lgpl-3.0.txt" + }, + "shared_runners_enabled": True, + "group_runners_enabled": True, + "forks_count": 0, + "star_count": 0, + "runners_token": "b8bc4a7a29eb76ea83cf79e4908c2b", + "ci_default_git_depth": 50, + "ci_forward_deployment_enabled": True, + "ci_forward_deployment_rollback_allowed": True, + "ci_allow_fork_pipelines_to_run_in_parent_project": True, + "ci_separated_caches": True, + "ci_restrict_pipeline_cancellation_role": "developer", + "ci_pipeline_variables_minimum_override_role": "maintainer", + "ci_push_repository_for_job_token_allowed": False, + "public_jobs": True, + "shared_with_groups": [ + { + "group_id": 4, + "group_name": "Twitter", + "group_full_path": "twitter", + "group_access_level": 30 + }, + { + "group_id": 3, + "group_name": "Gitlab Org", + "group_full_path": "gitlab-org", + "group_access_level": 10 + } + ], + "repository_storage": "default", + "only_allow_merge_if_pipeline_succeeds": False, + "allow_merge_on_skipped_pipeline": False, + "allow_pipeline_trigger_approve_deployment": False, + "restrict_user_defined_variables": False, + "only_allow_merge_if_all_discussions_are_resolved": False, + "remove_source_branch_after_merge": False, + "printing_merge_requests_link_enabled": True, + "request_access_enabled": False, + "merge_method": "merge", + "squash_option": "default_on", + "auto_devops_enabled": True, + "auto_devops_deploy_strategy": "continuous", + "approvals_before_merge": 0, + "mirror": False, + "mirror_user_id": 45, + "mirror_trigger_builds": False, + "only_mirror_protected_branches": False, + "mirror_overwrites_diverged_branches": False, + "external_authorization_classification_label": None, + "packages_enabled": True, + "service_desk_enabled": False, + "service_desk_address": None, + "autoclose_referenced_issues": True, + "suggestion_commit_message": None, + "enforce_auth_checks_on_uploads": True, + "merge_commit_template": None, + "squash_commit_template": None, + "issue_branch_template": "gitlab/%{id}-%{title}", + "marked_for_deletion_at": "2020-04-03", + "marked_for_deletion_on": "2020-04-03", + "compliance_frameworks": ["sox"], + "warn_about_potentially_unwanted_characters": True, + "statistics": { + "commit_count": 37, + "storage_size": 1038090, + "repository_size": 1038090, + "wiki_size": 0, + "lfs_objects_size": 0, + "job_artifacts_size": 0, + "pipeline_artifacts_size": 0, + "packages_size": 0, + "snippets_size": 0, + "uploads_size": 0, + "container_registry_size": 0 + }, + "container_registry_image_prefix": "registry.example.com/diaspora/diaspora-client", + "_links": { + "self": "http://example.com/api/v4/projects", + "issues": "http://example.com/api/v4/projects/1/issues", + "merge_requests": "http://example.com/api/v4/projects/1/merge_requests", + "repo_branches": "http://example.com/api/v4/projects/1/repository_branches", + "labels": "http://example.com/api/v4/projects/1/labels", + "events": "http://example.com/api/v4/projects/1/events", + "members": "http://example.com/api/v4/projects/1/members", + "cluster_agents": "http://example.com/api/v4/projects/1/cluster_agents" + } + } + + MOCK_SNIPPET_DICT = { + "id": 1, + "title": "test", + "file_name": "add.rb", + "description": "Ruby test snippet", + "author": { + "id": 1, + "username": "john_smith", + "email": "john@example.com", + "name": "John Smith", + "state": "active", + "created_at": "2012-05-23T08:00:58Z" + }, + "updated_at": "2012-06-28T10:52:04Z", + "created_at": "2012-06-28T10:52:04Z", + "imported": False, + "imported_from": "none", + "project_id": 1, + "web_url": "http://example.com/example/example/snippets/1", + "raw_url": "http://example.com/example/example/snippets/1/raw" + } + + MOCK_USER_DICT = { + "id": 1, + "username": "john_smith", + "name": "John Smith", + "state": "active", + "locked": False, + "avatar_url": "http://localhost:3000/uploads/user/avatar/1/cd8.jpeg", + "web_url": "http://localhost:3000/john_smith" + } + + MOCK_WIKI_BLOB_DICT = { + "basename": "home", + "data": "hello\n\nand bye\n\nend", + "path": "home.md", + "filename": "home.md", + "id": None, + "ref": "main", + "startline": 5, + "project_id": 6, + "group_id": None + } + + MOCK_SIGNATURE_DICT = { + 'name': 'Akamai API Access Tokens', + 'status': 'enabled', + 'author': 'PaperMtn', + 'date': '2023-12-22', + 'description': 'Detects exposed Akamai API Access tokens', + 'severity': '90', + 'notes': None, + 'references': None, + 'watchman_apps': { + 'gitlab': { + 'scope': [ + 'blobs' + ], + 'search_strings': [ + 'akab-' + ] + } + }, + 'test_cases': { + 'match_cases': [ + 'client_token: akab-rWdcwwASNbe9fcGk-00qwecOueticOXxA' + ], + 'fail_cases': [ + 'host: akab-fakehost.akamaiapis.net' + ] + }, + 'patterns': [ + 'akab-[0-9a-zA-Z]{16}-[0-9a-zA-Z]{16}' + ] + } + + +@pytest.fixture +def mock_commit(): + return commit.create_from_dict(GitLabMockData.MOCK_COMMIT_DICT) + + +@pytest.fixture +def mock_blob(): + return blob.create_from_dict(GitLabMockData.MOCK_BLOB_DICT) + + +@pytest.fixture +def mock_file(): + return file.create_from_dict(GitLabMockData.MOCK_FILE_DICT) + + +@pytest.fixture +def mock_group(): + return group.create_from_dict(GitLabMockData.MOCK_GROUP_DICT) + + +@pytest.fixture +def mock_issue(): + return issue.create_from_dict(GitLabMockData.MOCK_ISSUE_DICT) + + +@pytest.fixture +def mock_merge_request(): + return merge_request.create_from_dict(GitLabMockData.MOCK_MERGE_REQUEST_DICT) + + +@pytest.fixture +def mock_milestone(): + return milestone.create_from_dict(GitLabMockData.MOCK_MILESTONE_DICT) + + +@pytest.fixture +def mock_note(): + return note.create_from_dict(GitLabMockData.MOCK_NOTE_DICT) + + +@pytest.fixture +def mock_project(): + return project.create_from_dict(GitLabMockData.MOCK_PROJECT_DICT) + + +@pytest.fixture +def mock_snippet(): + return snippet.create_from_dict(GitLabMockData.MOCK_SNIPPET_DICT) + + +@pytest.fixture +def mock_user(): + return user.create_from_dict(GitLabMockData.MOCK_USER_DICT) + + +@pytest.fixture +def mock_wiki_blob(): + return wiki_blob.create_from_dict(GitLabMockData.MOCK_WIKI_BLOB_DICT) + +@pytest.fixture +def mock_signature(): + return signature.create_from_dict(GitLabMockData.MOCK_SIGNATURE_DICT) \ No newline at end of file diff --git a/tests/unit/models/test_unit_blob.py b/tests/unit/models/test_unit_blob.py new file mode 100644 index 0000000..9cb41b0 --- /dev/null +++ b/tests/unit/models/test_unit_blob.py @@ -0,0 +1,40 @@ +from gitlab_watchman.models import blob + +from fixtures import ( + GitLabMockData, + mock_blob +) + + +def test_blob_initialisation(mock_blob): + # Test that the Blob object is of the correct type + assert isinstance(mock_blob, blob.Blob) + + # Test that the Blob object has the correct attributes + assert mock_blob.id == GitLabMockData.MOCK_BLOB_DICT.get('id') + assert mock_blob.basename == GitLabMockData.MOCK_BLOB_DICT.get('basename') + assert mock_blob.data == GitLabMockData.MOCK_BLOB_DICT.get('data') + assert mock_blob.path == GitLabMockData.MOCK_BLOB_DICT.get('path') + assert mock_blob.filename == GitLabMockData.MOCK_BLOB_DICT.get('filename') + assert mock_blob.ref == GitLabMockData.MOCK_BLOB_DICT.get('ref') + assert mock_blob.project_id == GitLabMockData.MOCK_BLOB_DICT.get('project_id') + + +def test_blob_missing_fields(): + # Create dict with missing fields + blob_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "basename": "ed899a2f4b5", + } + blob_object = blob.create_from_dict(blob_dict) + # Test that the Blob object is of the correct type + assert isinstance(blob_object, blob.Blob) + + # Test that the Blob object has the correct attributes + assert blob_object.id == blob_dict.get('id') + assert blob_object.basename == blob_dict.get('basename') + assert blob_object.data is None + assert blob_object.path is None + assert blob_object.filename is None + assert blob_object.ref is None + assert blob_object.project_id is None diff --git a/tests/unit/models/test_unit_commit.py b/tests/unit/models/test_unit_commit.py new file mode 100644 index 0000000..219f96f --- /dev/null +++ b/tests/unit/models/test_unit_commit.py @@ -0,0 +1,53 @@ +from gitlab_watchman.models import commit +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_commit +) + + +def test_commit_initialisation(mock_commit): + # Test that the Conversation object is of the correct type + assert isinstance(mock_commit, commit.Commit) + + # Test that the Conversation object has the correct attributes + assert mock_commit.id == GitLabMockData.MOCK_COMMIT_DICT.get('id') + assert mock_commit.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_COMMIT_DICT.get('created_at')) + assert mock_commit.title == GitLabMockData.MOCK_COMMIT_DICT.get('title') + assert mock_commit.message == GitLabMockData.MOCK_COMMIT_DICT.get('message') + assert mock_commit.author_name == GitLabMockData.MOCK_COMMIT_DICT.get('author_name') + assert mock_commit.author_email == GitLabMockData.MOCK_COMMIT_DICT.get('author_email') + assert mock_commit.authored_date == convert_to_utc_datetime(GitLabMockData.MOCK_COMMIT_DICT.get('authored_date')) + assert mock_commit.committer_name == GitLabMockData.MOCK_COMMIT_DICT.get('committer_name') + assert mock_commit.committer_email == GitLabMockData.MOCK_COMMIT_DICT.get('committer_email') + assert mock_commit.committed_date == convert_to_utc_datetime(GitLabMockData.MOCK_COMMIT_DICT.get('committed_date')) + assert mock_commit.web_url == GitLabMockData.MOCK_COMMIT_DICT.get('web_url') + assert mock_commit.status == GitLabMockData.MOCK_COMMIT_DICT.get('status') + assert mock_commit.project_id == GitLabMockData.MOCK_COMMIT_DICT.get('project_id') + + +def test_commit_missing_fields(): + # Create dict with missing fields + commit_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "short_id": "ed899a2f4b5", + } + commit_object = commit.create_from_dict(commit_dict) + # Test that the Conversation object is of the correct type + assert isinstance(commit_object, commit.Commit) + + # Test that the Conversation object has the correct attributes + assert commit_object.id == commit_dict.get('id') + assert commit_object.created_at is None + assert commit_object.title is None + assert commit_object.message is None + assert commit_object.author_name is None + assert commit_object.author_email is None + assert commit_object.authored_date is None + assert commit_object.committer_name is None + assert commit_object.committer_email is None + assert commit_object.committed_date is None + assert commit_object.web_url is None + assert commit_object.status is None + assert commit_object.project_id is None diff --git a/tests/unit/models/test_unit_file.py b/tests/unit/models/test_unit_file.py new file mode 100644 index 0000000..a07de7d --- /dev/null +++ b/tests/unit/models/test_unit_file.py @@ -0,0 +1,40 @@ +from gitlab_watchman.models import file + +from fixtures import ( + GitLabMockData, + mock_file +) + + +def test_file_initialisation(mock_file): + # Test that the File object is of the correct type + assert isinstance(mock_file, file.File) + + # Test that the File object has the correct attributes + assert mock_file.file_name == GitLabMockData.MOCK_FILE_DICT.get('file_name') + assert mock_file.file_path == GitLabMockData.MOCK_FILE_DICT.get('file_path') + assert mock_file.size == GitLabMockData.MOCK_FILE_DICT.get('size') + assert mock_file.encoding == GitLabMockData.MOCK_FILE_DICT.get('encoding') + assert mock_file.ref == GitLabMockData.MOCK_FILE_DICT.get('ref') + assert mock_file.commit_id == GitLabMockData.MOCK_FILE_DICT.get('commit_id') + assert mock_file.last_commit_id == GitLabMockData.MOCK_FILE_DICT.get('last_commit_id') + + +def test_file_missing_fields(): + # Create dict with missing fields + file_dict = { + "file_name": "my_file.txt", + "size": "10", + } + file_object = file.create_from_dict(file_dict) + # Test that the File object is of the correct type + assert isinstance(file_object, file.File) + + # Test that the File object has the correct attributes + assert file_object.file_name == file_dict.get('file_name') + assert file_object.file_path is None + assert file_object.size == file_dict.get('size') + assert file_object.encoding is None + assert file_object.ref is None + assert file_object.commit_id is None + assert file_object.last_commit_id is None \ No newline at end of file diff --git a/tests/unit/models/test_unit_group.py b/tests/unit/models/test_unit_group.py new file mode 100644 index 0000000..067b496 --- /dev/null +++ b/tests/unit/models/test_unit_group.py @@ -0,0 +1,57 @@ +from gitlab_watchman.models import group +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_group +) + + +def test_group_initialisation(mock_group): + # Test that the Group object is of the correct type + assert isinstance(mock_group, group.Group) + + # Test that the Group object has the correct attributes + assert mock_group.id == GitLabMockData.MOCK_GROUP_DICT.get('id') + assert mock_group.name == GitLabMockData.MOCK_GROUP_DICT.get('name') + assert mock_group.path == GitLabMockData.MOCK_GROUP_DICT.get('path') + assert mock_group.description == GitLabMockData.MOCK_GROUP_DICT.get('description') + assert mock_group.visibility == GitLabMockData.MOCK_GROUP_DICT.get('visibility') + assert mock_group.require_two_factor_authentication == GitLabMockData.MOCK_GROUP_DICT.get('require_two_factor_authentication') + assert mock_group.two_factor_grace_period == GitLabMockData.MOCK_GROUP_DICT.get('two_factor_grace_period') + assert mock_group.auto_devops_enabled == GitLabMockData.MOCK_GROUP_DICT.get('auto_devops_enabled') + assert mock_group.emails_disabled == GitLabMockData.MOCK_GROUP_DICT.get('emails_disabled') + assert mock_group.request_access_enabled == GitLabMockData.MOCK_GROUP_DICT.get('request_access_enabled') + assert mock_group.full_name == GitLabMockData.MOCK_GROUP_DICT.get('full_name') + assert mock_group.full_path == GitLabMockData.MOCK_GROUP_DICT.get('full_path') + assert mock_group.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_GROUP_DICT.get('created_at')) + assert mock_group.web_url == GitLabMockData.MOCK_GROUP_DICT.get('web_url') + assert mock_group.ip_restriction_ranges == GitLabMockData.MOCK_GROUP_DICT.get('ip_restriction_ranges') + + +def test_group_missing_fields(): + # Create dict with missing fields + group_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "name": "my_group", + } + group_object = group.create_from_dict(group_dict) + # Test that the Group object is of the correct type + assert isinstance(group_object, group.Group) + + # Test that the Group object has the correct attributes + assert group_object.id == group_dict.get('id') + assert group_object.name == group_dict.get('name') + assert group_object.path is None + assert group_object.description is None + assert group_object.visibility is None + assert group_object.require_two_factor_authentication is None + assert group_object.two_factor_grace_period is None + assert group_object.auto_devops_enabled is None + assert group_object.emails_disabled is None + assert group_object.request_access_enabled is None + assert group_object.full_name is None + assert group_object.full_path is None + assert group_object.created_at is None + assert group_object.web_url is None + assert group_object.ip_restriction_ranges is None \ No newline at end of file diff --git a/tests/unit/models/test_unit_issue.py b/tests/unit/models/test_unit_issue.py new file mode 100644 index 0000000..2532553 --- /dev/null +++ b/tests/unit/models/test_unit_issue.py @@ -0,0 +1,70 @@ +from gitlab_watchman.models import issue, user +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_issue +) + + +def test_issue_initialisation(mock_issue): + # Test that the Issue object is of the correct type + assert isinstance(mock_issue, issue.Issue) + + # Test that the Issue object has the correct attributes + assert mock_issue.id == GitLabMockData.MOCK_ISSUE_DICT.get('id') + assert mock_issue.iid == GitLabMockData.MOCK_ISSUE_DICT.get('iid') + assert mock_issue.project_id == GitLabMockData.MOCK_ISSUE_DICT.get('project_id') + assert mock_issue.title == GitLabMockData.MOCK_ISSUE_DICT.get('title') + assert mock_issue.description == GitLabMockData.MOCK_ISSUE_DICT.get('description') + assert mock_issue.state == GitLabMockData.MOCK_ISSUE_DICT.get('state') + assert mock_issue.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_ISSUE_DICT.get('created_at')) + assert mock_issue.updated_at == convert_to_utc_datetime(GitLabMockData.MOCK_ISSUE_DICT.get('updated_at')) + assert mock_issue.closed_by == user.create_from_dict(GitLabMockData.MOCK_ISSUE_DICT.get('closed_by')) + assert mock_issue.closed_at == convert_to_utc_datetime(GitLabMockData.MOCK_ISSUE_DICT.get('closed_at')) + assert mock_issue.type == GitLabMockData.MOCK_ISSUE_DICT.get('type') + assert mock_issue.author == user.create_from_dict( + GitLabMockData.MOCK_ISSUE_DICT.get('author')), GitLabMockData.MOCK_ISSUE_DICT.get('author') + assert mock_issue.confidential == GitLabMockData.MOCK_ISSUE_DICT.get('confidential') + assert mock_issue.web_url == GitLabMockData.MOCK_ISSUE_DICT.get('web_url') + + +def test_issues_missing_fields(): + # Create dict with missing fields + issue_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "iid": "1", + } + issue_object = issue.create_from_dict(issue_dict) + # Test that the Issue object is of the correct type + assert isinstance(issue_object, issue.Issue) + + # Test that the Issue object has the correct attributes + assert issue_object.id == issue_dict.get('id') + assert issue_object.iid == issue_dict.get('iid') + assert issue_object.project_id is None + assert issue_object.title is None + assert issue_object.description is None + assert issue_object.state is None + assert issue_object.created_at is None + assert issue_object.updated_at is None + assert issue_object.closed_by is None + assert issue_object.closed_at is None + assert issue_object.type is None + assert issue_object.author is None + assert issue_object.confidential is None + assert issue_object.web_url is None + + +def test_issue_user_initialisation(mock_issue): + # Test creating a user object with the response from the GitLab API + + # Test that the User object is of the correct type + assert isinstance(mock_issue.author, user.User) + + # Test that the User object has the correct attributes + assert mock_issue.author.id == GitLabMockData.MOCK_ISSUE_DICT.get('author').get('id') + assert mock_issue.author.name == GitLabMockData.MOCK_ISSUE_DICT.get('author').get('name') + assert mock_issue.author.username == GitLabMockData.MOCK_ISSUE_DICT.get('author').get('username') + assert mock_issue.author.state == GitLabMockData.MOCK_ISSUE_DICT.get('author').get('state') + assert mock_issue.author.web_url == GitLabMockData.MOCK_ISSUE_DICT.get('author').get('web_url') diff --git a/tests/unit/models/test_unit_merge_request.py b/tests/unit/models/test_unit_merge_request.py new file mode 100644 index 0000000..81eaeb7 --- /dev/null +++ b/tests/unit/models/test_unit_merge_request.py @@ -0,0 +1,88 @@ +from gitlab_watchman.models import merge_request, user +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_merge_request +) + + +def test_merge_request_initialisation(mock_merge_request): + # Test that the MergeRequest object is of the correct type + assert isinstance(mock_merge_request, merge_request.MergeRequest) + + # Test that the MergeRequest object has the correct attributes + assert mock_merge_request.id == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('id') + assert mock_merge_request.iid == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('iid') + assert mock_merge_request.project_id == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('project_id') + assert mock_merge_request.title == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('title') + assert mock_merge_request.description == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('description') + assert mock_merge_request.state == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('state') + assert mock_merge_request.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('created_at')) + assert mock_merge_request.updated_at == convert_to_utc_datetime(GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('updated_at')) + assert mock_merge_request.merged_by is None + assert mock_merge_request.merged_at == convert_to_utc_datetime(GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('merged_at')) + assert mock_merge_request.target_branch == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('target_branch') + assert mock_merge_request.source_branch == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('source_branch') + assert isinstance(mock_merge_request.author, user.User) + assert mock_merge_request.source_project_id == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('source_project_id') + assert mock_merge_request.target_project_id == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('target_project_id') + assert mock_merge_request.merge_status == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('merge_status') + assert mock_merge_request.web_url == GitLabMockData.MOCK_MERGE_REQUEST_DICT.get('web_url') + + +def test_merge_request_missing_fields(): + # Create dict with missing fields + merge_request_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "iid": "1", + } + merge_request_object = merge_request.create_from_dict(merge_request_dict) + # Test that the MergeRequest object is of the correct type + assert isinstance(merge_request_object, merge_request.MergeRequest) + + # Test that the MergeRequest object has the correct attributes + assert merge_request_object.id == merge_request_dict.get('id') + assert merge_request_object.iid == merge_request_dict.get('iid') + assert merge_request_object.project_id is None + assert merge_request_object.title is None + assert merge_request_object.description is None + assert merge_request_object.state is None + assert merge_request_object.created_at is None + assert merge_request_object.updated_at is None + assert merge_request_object.merged_by is None + assert merge_request_object.merged_at is None + assert merge_request_object.target_branch is None + assert merge_request_object.source_branch is None + assert merge_request_object.author is None + assert merge_request_object.source_project_id is None + assert merge_request_object.target_project_id is None + assert merge_request_object.merge_status is None + assert merge_request_object.web_url is None + + +def test_initialisation_with_merged_by_user(mock_merge_request): + # Create dict and add merged_by user + merge_request_dict = GitLabMockData.MOCK_MERGE_REQUEST_DICT.copy() + merge_request_dict['merged_by'] = { + "id": 1, + "name": "Administrator", + "username": "root", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/e64c7d89f26bd1972efa854d13d7dd61?s=80&d=identicon", + "web_url": "http://localhost:3000/root" + } + merge_request_object = merge_request.create_from_dict(merge_request_dict) + + # Test that the MergeRequest object is of the correct type + assert isinstance(merge_request_object, merge_request.MergeRequest) + + # Test that the MergeRequest object has the correct attributes + assert isinstance(merge_request_object.merged_by, user.User) + + # Test that the User object is of the correct type + assert merge_request_object.merged_by.id == 1 + assert merge_request_object.merged_by.name == 'Administrator' + assert merge_request_object.merged_by.username == 'root' + assert merge_request_object.merged_by.state == 'active' + assert merge_request_object.merged_by.web_url == 'http://localhost:3000/root' diff --git a/tests/unit/models/test_unit_milestone.py b/tests/unit/models/test_unit_milestone.py new file mode 100644 index 0000000..2993b10 --- /dev/null +++ b/tests/unit/models/test_unit_milestone.py @@ -0,0 +1,51 @@ +from gitlab_watchman.models import milestone +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_milestone +) + + +def test_milestone_initialisation(mock_milestone): + # Test that the Milestone object is of the correct type + assert isinstance(mock_milestone, milestone.Milestone) + + # Test that the Milestone object has the correct attributes + assert mock_milestone.id == GitLabMockData.MOCK_MILESTONE_DICT.get('id') + assert mock_milestone.iid == GitLabMockData.MOCK_MILESTONE_DICT.get('iid') + assert mock_milestone.project_id == GitLabMockData.MOCK_MILESTONE_DICT.get('project_id') + assert mock_milestone.title == GitLabMockData.MOCK_MILESTONE_DICT.get('title') + assert mock_milestone.description == GitLabMockData.MOCK_MILESTONE_DICT.get('description') + assert mock_milestone.state == GitLabMockData.MOCK_MILESTONE_DICT.get('state') + assert mock_milestone.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_MILESTONE_DICT.get('created_at')) + assert mock_milestone.updated_at == convert_to_utc_datetime(GitLabMockData.MOCK_MILESTONE_DICT.get('updated_at')) + assert mock_milestone.due_date == convert_to_utc_datetime(GitLabMockData.MOCK_MILESTONE_DICT.get('due_date')) + assert mock_milestone.start_date == convert_to_utc_datetime(GitLabMockData.MOCK_MILESTONE_DICT.get('start_date')) + assert mock_milestone.expired == GitLabMockData.MOCK_MILESTONE_DICT.get('expired') + assert mock_milestone.web_url == GitLabMockData.MOCK_MILESTONE_DICT.get('web_url') + + +def test_milestone_missing_fields(): + # Create dict with missing fields + milestone_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "iid": "1", + } + milestone_object = milestone.create_from_dict(milestone_dict) + # Test that the Milestone object is of the correct type + assert isinstance(milestone_object, milestone.Milestone) + + # Test that the Milestone object has the correct attributes + assert milestone_object.id == milestone_dict.get('id') + assert milestone_object.iid == milestone_dict.get('iid') + assert milestone_object.project_id is None + assert milestone_object.title is None + assert milestone_object.description is None + assert milestone_object.state is None + assert milestone_object.created_at is None + assert milestone_object.updated_at is None + assert milestone_object.due_date is None + assert milestone_object.start_date is None + assert milestone_object.expired is None + assert milestone_object.web_url is None \ No newline at end of file diff --git a/tests/unit/models/test_unit_note.py b/tests/unit/models/test_unit_note.py new file mode 100644 index 0000000..0a4b498 --- /dev/null +++ b/tests/unit/models/test_unit_note.py @@ -0,0 +1,89 @@ +from gitlab_watchman.models import note, user +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_note +) + + +def test_note_initialisation(mock_note): + # Test that the Note object is of the correct type + assert isinstance(mock_note, note.Note) + + # Test that the Note object has the correct attributes + assert mock_note.id == GitLabMockData.MOCK_NOTE_DICT.get('id') + assert mock_note.type == GitLabMockData.MOCK_NOTE_DICT.get('type') + assert mock_note.body == GitLabMockData.MOCK_NOTE_DICT.get('body') + assert mock_note.attachment == GitLabMockData.MOCK_NOTE_DICT.get('attachment') + assert isinstance(mock_note.author, user.User) + assert mock_note.author.id == GitLabMockData.MOCK_NOTE_DICT.get('author').get('id') + assert mock_note.author.name == GitLabMockData.MOCK_NOTE_DICT.get('author').get('name') + assert mock_note.author.username == GitLabMockData.MOCK_NOTE_DICT.get('author').get('username') + assert mock_note.author.state == GitLabMockData.MOCK_NOTE_DICT.get('author').get('state') + assert mock_note.author.web_url == GitLabMockData.MOCK_NOTE_DICT.get('author').get('web_url') + assert mock_note.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_NOTE_DICT.get('created_at')) + assert mock_note.updated_at == convert_to_utc_datetime(GitLabMockData.MOCK_NOTE_DICT.get('updated_at')) + assert mock_note.system == GitLabMockData.MOCK_NOTE_DICT.get('system') + assert mock_note.noteable_id == GitLabMockData.MOCK_NOTE_DICT.get('noteable_id') + assert mock_note.noteable_type == GitLabMockData.MOCK_NOTE_DICT.get('noteable_type') + assert mock_note.commit_id == GitLabMockData.MOCK_NOTE_DICT.get('commit_id') + assert mock_note.resolvable == GitLabMockData.MOCK_NOTE_DICT.get('resolvable') + assert mock_note.resolved_by is None + assert mock_note.resolved_at == convert_to_utc_datetime(GitLabMockData.MOCK_NOTE_DICT.get('resolved_at')) + assert mock_note.confidential == GitLabMockData.MOCK_NOTE_DICT.get('confidential') + assert mock_note.noteable_iid == GitLabMockData.MOCK_NOTE_DICT.get('noteable_iid') + assert mock_note.command_changes == GitLabMockData.MOCK_NOTE_DICT.get('command_changes') + + +def test_note_missing_fields(): + # Crete dict with missing fields + note_dict = { + "id": 1, + "type": "note", + } + note_object = note.create_from_dict(note_dict) + # Test that the Note object is of the correct type + assert isinstance(note_object, note.Note) + + # Test that the Note object has the correct attributes + assert note_object.id == note_dict.get('id') + assert note_object.type == note_dict.get('type') + assert note_object.body is None + assert note_object.attachment is None + assert note_object.author is None + assert note_object.created_at is None + assert note_object.updated_at is None + assert note_object.system is None + assert note_object.noteable_id is None + assert note_object.noteable_type is None + assert note_object.commit_id is None + assert note_object.resolvable is None + assert note_object.resolved_by is None + assert note_object.resolved_at is None + assert note_object.confidential is None + assert note_object.noteable_iid is None + assert note_object.command_changes is None + + +def test_note_with_resolved_by(): + # Test that the Note object is of the correct type + note_dict = GitLabMockData.MOCK_NOTE_DICT.copy() + note_dict['resolved_by'] = { + "id": 1, + "name": "Administrator", + "username": "root", + "state": "active", + "avatar_url": "https://www.gravatar.com/avatar/e64c7d89f26bd1972efa854d13d7dd61?s=80&d=identicon", + "web_url": "http://localhost:3000/root" + } + note_object = note.create_from_dict(note_dict) + + # Test that the Note object has the correct attributes + assert isinstance(note_object, note.Note) + assert isinstance(note_object.resolved_by, user.User) + assert note_object.resolved_by.id == note_dict.get('resolved_by').get('id') + assert note_object.resolved_by.name == note_dict.get('resolved_by').get('name') + assert note_object.resolved_by.username == note_dict.get('resolved_by').get('username') + assert note_object.resolved_by.state == note_dict.get('resolved_by').get('state') + assert note_object.resolved_by.web_url == note_dict.get('resolved_by').get('web_url') \ No newline at end of file diff --git a/tests/unit/models/test_unit_project.py b/tests/unit/models/test_unit_project.py new file mode 100644 index 0000000..8ab63c5 --- /dev/null +++ b/tests/unit/models/test_unit_project.py @@ -0,0 +1,69 @@ +from gitlab_watchman.models import project +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_project +) + + +def test_project_initialisation(mock_project): + # Test that the Project object is of the correct type + assert isinstance(mock_project, project.Project) + + # Test that the Project object has the correct attributes + assert mock_project.id == GitLabMockData.MOCK_PROJECT_DICT.get('id') + assert mock_project.description == GitLabMockData.MOCK_PROJECT_DICT.get('description') + assert mock_project.name == GitLabMockData.MOCK_PROJECT_DICT.get('name') + assert mock_project.name_with_namespace == GitLabMockData.MOCK_PROJECT_DICT.get('name_with_namespace') + assert mock_project.path == GitLabMockData.MOCK_PROJECT_DICT.get('path') + assert mock_project.path_with_namespace == GitLabMockData.MOCK_PROJECT_DICT.get('path_with_namespace') + assert mock_project.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_PROJECT_DICT.get('created_at')) + assert mock_project.web_url == GitLabMockData.MOCK_PROJECT_DICT.get('web_url') + assert mock_project.last_activity_at == convert_to_utc_datetime(GitLabMockData.MOCK_PROJECT_DICT.get('last_activity_at')) + + # Test that the Namespace object is of the correct type + assert isinstance(mock_project.namespace, project.Namespace) + + # Test that the Namespace object has the correct attributes + assert mock_project.namespace.id == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('id') + assert mock_project.namespace.name == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('name') + assert mock_project.namespace.path == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('path') + assert mock_project.namespace.web_url == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('web_url') + assert mock_project.namespace.kind == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('kind') + assert mock_project.namespace.full_path == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('full_path') + assert mock_project.namespace.parent_id == GitLabMockData.MOCK_PROJECT_DICT.get('namespace').get('parent_id') + + +def test_project_missing_fields(): + # Create a dict with missing fields + project_dict = { + "id": "1", + "description": "Test project", + } + project_object = project.create_from_dict(project_dict) + # Test that the Project object is of the correct type + assert isinstance(project_object, project.Project) + + # Test that the Project object has the correct attributes + assert project_object.id == project_dict.get('id') + assert project_object.description == project_dict.get('description') + assert project_object.name is None + assert project_object.name_with_namespace is None + assert project_object.path is None + assert project_object.path_with_namespace is None + assert project_object.created_at is None + assert project_object.web_url is None + assert project_object.last_activity_at is None + + # Test that the Namespace object is of the correct type + assert isinstance(project_object.namespace, project.Namespace) + + # Test that the Namespace object has the correct attributes + assert project_object.namespace.id is None + assert project_object.namespace.name is None + assert project_object.namespace.path is None + assert project_object.namespace.web_url is None + assert project_object.namespace.kind is None + assert project_object.namespace.full_path is None + assert project_object.namespace.parent_id is None diff --git a/tests/unit/models/test_unit_signature.py b/tests/unit/models/test_unit_signature.py new file mode 100644 index 0000000..d8c2273 --- /dev/null +++ b/tests/unit/models/test_unit_signature.py @@ -0,0 +1,99 @@ +import pytest +import copy + +from gitlab_watchman.models import signature +from fixtures import GitLabMockData, mock_signature + + +def test_signature_initialisation(mock_signature): + # Test that the signature object is initialised + assert isinstance(mock_signature, signature.Signature) + + # Test that the signature object has the correct attributes + assert mock_signature.name == GitLabMockData.MOCK_SIGNATURE_DICT.get('name') + assert mock_signature.status == GitLabMockData.MOCK_SIGNATURE_DICT.get('status') + assert mock_signature.author == GitLabMockData.MOCK_SIGNATURE_DICT.get('author') + assert mock_signature.date == GitLabMockData.MOCK_SIGNATURE_DICT.get('date') + assert mock_signature.description == GitLabMockData.MOCK_SIGNATURE_DICT.get('description') + assert mock_signature.severity == GitLabMockData.MOCK_SIGNATURE_DICT.get('severity') + assert mock_signature.watchman_apps == GitLabMockData.MOCK_SIGNATURE_DICT.get('watchman_apps') + assert mock_signature.scope == GitLabMockData.MOCK_SIGNATURE_DICT.get('watchman_apps').get('gitlab').get('scope') + + +def test_field_type(): + # Test that correct error is raised when name is not a string + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['name'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when status is not a string + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['status'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when author is not a string + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['author'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when date is not a string + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['date'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when severity is not a string or int + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['severity'] = 5.0 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when watchman_apps is not a dict + signature_dict_temp = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict_temp['watchman_apps'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when scope is not a list + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['watchman_apps']['gitlab']['scope'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when search_strings is not a list + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['watchman_apps']['gitlab']['search_strings'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when patterns is not a list + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['patterns'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when version is not a string + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['version'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + # Test that correct error is raised when description is not a string + signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + signature_dict['description'] = 123 + with pytest.raises(TypeError): + test_signature = signature.create_from_dict(signature_dict) + + +def test_missing_field(): + temp_signature_dict = copy.deepcopy(GitLabMockData.MOCK_SIGNATURE_DICT) + del temp_signature_dict['name'] + test_signature = signature.create_from_dict(temp_signature_dict) + assert test_signature.name is None + + del temp_signature_dict['watchman_apps'] + test_signature = signature.create_from_dict(temp_signature_dict) + assert test_signature.watchman_apps is None diff --git a/tests/unit/models/test_unit_snippet.py b/tests/unit/models/test_unit_snippet.py new file mode 100644 index 0000000..196e324 --- /dev/null +++ b/tests/unit/models/test_unit_snippet.py @@ -0,0 +1,94 @@ +from gitlab_watchman.models import snippet, user +from gitlab_watchman.utils import convert_to_utc_datetime + +from fixtures import ( + GitLabMockData, + mock_snippet +) + + +def test_snippet_initialisation(mock_snippet): + # Test that the Snippet object is of the correct type + assert isinstance(mock_snippet, snippet.Snippet) + + # Test that the Snippet object has the correct attributes + assert mock_snippet.id == GitLabMockData.MOCK_SNIPPET_DICT.get('id') + assert mock_snippet.title == GitLabMockData.MOCK_SNIPPET_DICT.get('title') + assert mock_snippet.description == GitLabMockData.MOCK_SNIPPET_DICT.get('description') + assert mock_snippet.visibility == GitLabMockData.MOCK_SNIPPET_DICT.get('visibility') + assert mock_snippet.created_at == convert_to_utc_datetime(GitLabMockData.MOCK_SNIPPET_DICT.get('created_at')) + assert mock_snippet.updated_at == convert_to_utc_datetime(GitLabMockData.MOCK_SNIPPET_DICT.get('updated_at')) + assert mock_snippet.file_name == GitLabMockData.MOCK_SNIPPET_DICT.get('file_name') + assert mock_snippet.web_url == GitLabMockData.MOCK_SNIPPET_DICT.get('web_url') + + assert mock_snippet.files is None + assert isinstance(mock_snippet.author, user.User) + + assert mock_snippet.author.id == GitLabMockData.MOCK_SNIPPET_DICT.get('author').get('id') + assert mock_snippet.author.name == GitLabMockData.MOCK_SNIPPET_DICT.get('author').get('name') + assert mock_snippet.author.username == GitLabMockData.MOCK_SNIPPET_DICT.get('author').get('username') + assert mock_snippet.author.state == GitLabMockData.MOCK_SNIPPET_DICT.get('author').get('state') + assert mock_snippet.author.web_url == GitLabMockData.MOCK_SNIPPET_DICT.get('author').get('web_url') + + +def test_snippet_missing_fields(): + # Create a dict with missing fields + snippet_dict = { + "id": "1", + "title": "Test snippet", + } + snippet_object = snippet.create_from_dict(snippet_dict) + # Test that the Snippet object is of the correct type + assert isinstance(snippet_object, snippet.Snippet) + + # Test that the Snippet object has the correct attributes + assert snippet_object.id == snippet_dict.get('id') + assert snippet_object.title == snippet_dict.get('title') + assert snippet_object.description is None + assert snippet_object.visibility is None + assert snippet_object.created_at is None + assert snippet_object.updated_at is None + assert snippet_object.file_name is None + assert snippet_object.web_url is None + + assert snippet_object.author is None + assert snippet_object.files is None + + +def test_snippet_file_initialisation(): + # Test creating with one file + snippet_dict_one = GitLabMockData.MOCK_SNIPPET_DICT.copy() + snippet_dict_one['files'] = [ + { + 'path': 'README.md', + 'raw_url': 'https://gitlab.com/test/test/-/blob/master/README.md' + } + ] + snippet_object_one = snippet.create_from_dict(snippet_dict_one) + assert isinstance(snippet_object_one.files, list) + assert len(snippet_object_one.files) == 1 + assert isinstance(snippet_object_one.files[0], snippet.File) + assert snippet_object_one.files[0].path == 'README.md' + assert snippet_object_one.files[0].raw_url == 'https://gitlab.com/test/test/-/blob/master/README.md' + + # Test creating with multiple files + snippet_dict_two = GitLabMockData.MOCK_SNIPPET_DICT.copy() + snippet_dict_two['files'] = [ + { + 'path': 'README.md', + 'raw_url': 'https://gitlab.com/test/test/-/blob/master/README.md' + }, + { + 'path': 'LICENSE', + 'raw_url': 'https://gitlab.com/test/test/-/blob/master/LICENSE' + } + ] + snippet_object_two = snippet.create_from_dict(snippet_dict_two) + assert isinstance(snippet_object_two.files, list) + assert len(snippet_object_two.files) == 2 + assert isinstance(snippet_object_two.files[0], snippet.File) + assert isinstance(snippet_object_two.files[1], snippet.File) + assert snippet_object_two.files[0].path == 'README.md' + assert snippet_object_two.files[0].raw_url == 'https://gitlab.com/test/test/-/blob/master/README.md' + assert snippet_object_two.files[1].path == 'LICENSE' + assert snippet_object_two.files[1].raw_url == 'https://gitlab.com/test/test/-/blob/master/LICENSE' diff --git a/tests/unit/models/test_unit_user.py b/tests/unit/models/test_unit_user.py new file mode 100644 index 0000000..3f4d6d0 --- /dev/null +++ b/tests/unit/models/test_unit_user.py @@ -0,0 +1,36 @@ +from gitlab_watchman.models import user + +from fixtures import ( + GitLabMockData, + mock_user +) + + +def test_user_initialisation(mock_user): + # Test that the User object is of the correct type + assert isinstance(mock_user, user.User) + + # Test that the User object has the correct attributes + assert mock_user.id == GitLabMockData.MOCK_USER_DICT.get('id') + assert mock_user.name == GitLabMockData.MOCK_USER_DICT.get('name') + assert mock_user.username == GitLabMockData.MOCK_USER_DICT.get('username') + assert mock_user.state == GitLabMockData.MOCK_USER_DICT.get('state') + assert mock_user.web_url == GitLabMockData.MOCK_USER_DICT.get('web_url') + + +def test_user_missing_fields(): + # Create a user object with missing fields + user_dict = { + "id": "1", + "name": "Test user", + } + + # Test that the User object is of the correct type + assert isinstance(user.create_from_dict(user_dict), user.User) + + # Test that the User object has the correct attributes + assert user.create_from_dict(user_dict).id == user_dict.get('id') + assert user.create_from_dict(user_dict).name == user_dict.get('name') + assert user.create_from_dict(user_dict).username is None + assert user.create_from_dict(user_dict).state is None + assert user.create_from_dict(user_dict).web_url is None diff --git a/tests/unit/models/test_unit_wiki_blob.py b/tests/unit/models/test_unit_wiki_blob.py new file mode 100644 index 0000000..d9dc417 --- /dev/null +++ b/tests/unit/models/test_unit_wiki_blob.py @@ -0,0 +1,38 @@ +from gitlab_watchman.models import wiki_blob +from fixtures import GitLabMockData, mock_wiki_blob + + +def test_wiki_blob_initialisation(mock_wiki_blob): + # Test that the WikiBlob object is of the correct type + assert isinstance(mock_wiki_blob, wiki_blob.WikiBlob) + + # Test that the WikiBlob object has the correct attributes + assert mock_wiki_blob.id == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('id') + assert mock_wiki_blob.basename == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('basename') + assert mock_wiki_blob.data == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('data') + assert mock_wiki_blob.path == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('path') + assert mock_wiki_blob.filename == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('filename') + assert mock_wiki_blob.ref == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('ref') + assert mock_wiki_blob.project_id == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('project_id') + assert mock_wiki_blob.group_id == GitLabMockData.MOCK_WIKI_BLOB_DICT.get('group_id') + + +def test_wiki_blob_missing_fields(): + # Create dict with missing fields + wiki_blob_dict = { + "id": "ed899a2f4b50b4370feeea94676502b42383c746", + "basename": "ed899a2f4b5", + } + wiki_blob_object = wiki_blob.create_from_dict(wiki_blob_dict) + # Test that the WikiBlob object is of the correct type + assert isinstance(wiki_blob_object, wiki_blob.WikiBlob) + + # Test that the WikiBlob object has the correct attributes + assert wiki_blob_object.id == wiki_blob_dict.get('id') + assert wiki_blob_object.basename == wiki_blob_dict.get('basename') + assert wiki_blob_object.data is None + assert wiki_blob_object.path is None + assert wiki_blob_object.filename is None + assert wiki_blob_object.ref is None + assert wiki_blob_object.project_id is None + assert wiki_blob_object.group_id is None diff --git a/tests/unit/test_unit_utils.py b/tests/unit/test_unit_utils.py new file mode 100644 index 0000000..73c964d --- /dev/null +++ b/tests/unit/test_unit_utils.py @@ -0,0 +1,217 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Dict, Any + +import pytest + +from gitlab_watchman.utils import ( + convert_to_epoch, + convert_to_utc_datetime, + deduplicate_results, + convert_to_dict, + split_to_chunks +) + + +def test_convert_to_epoch_string(): + # Test with a correct ISO 8601 timestamp containing seconds + string_timestamp = '2021-09-20T10:00:00.000+00:00' + expected_output = 1632132000 + assert convert_to_epoch(string_timestamp) == expected_output + + # Test with a correct ISO 8601 timestamp containing milliseconds + string_timestamp = '2021-09-20T10:00:00.123+00:00' + expected_output = 1632132000 + assert convert_to_epoch(string_timestamp) == expected_output + + # Test with a correct ISO 8601 timestamp containing microseconds + string_timestamp = '2021-09-20T10:00:00.123456+00:00' + expected_output = 1632132000 + assert convert_to_epoch(string_timestamp) == expected_output + + # Test with a correct ISO 8601 timestamp with different timezone - +05:00 + string_timestamp = '2021-09-20T15:00:00.000+05:00' + expected_output = 1632132000 + assert convert_to_epoch(string_timestamp) == expected_output + + # Test with a correct ISO 8601 timestamp with different timezone - -05:00 + string_timestamp = '2021-09-20T05:00:00.000-05:00' + expected_output = 1632132000 + assert convert_to_epoch(string_timestamp) == expected_output + + # Test with None input - Should gracefully fail and return None + string_timestamp = None + expected_output = None + assert convert_to_epoch(string_timestamp) == expected_output + + +def test_convert_to_epoch_datetime(): + # Test an int is returned when passing a datetime object + assert isinstance(convert_to_epoch(datetime.now()), int) + + # Test with None input - Should gracefully fail and return None + string_timestamp = None + expected_output = None + assert convert_to_epoch(string_timestamp) == expected_output + + # Test correct epoch timestamp is returned for datetime object + string_timestamp = '2021-09-20T10:00:00.000+00:00' + dt_input = datetime.strptime(string_timestamp, '%Y-%m-%dT%H:%M:%S.%f%z') + expected_output = 1632132000 + assert convert_to_epoch(dt_input) == expected_output + + # Test correct epoch timestamp is returned for datetime object with different timezone + string_timestamp = '2021-09-20T15:00:00.000+05:00' + dt_input = datetime.strptime(string_timestamp, '%Y-%m-%dT%H:%M:%S.%f%z') + expected_output = 1632132000 + assert convert_to_epoch(dt_input) == expected_output + + # Test correct epoch timestamp is returned for datetime object with different timezone + string_timestamp = '2021-09-20T05:00:00.000-05:00' + dt_input = datetime.strptime(string_timestamp, '%Y-%m-%dT%H:%M:%S.%f%z') + expected_output = 1632132000 + assert convert_to_epoch(dt_input) == expected_output + + +def test_convert_to_utc_datetime(): + # Test datetime object is returned + assert isinstance(convert_to_utc_datetime('2021-09-20T10:00:00.000+00:00'), datetime) + + # Test with a correct ISO 8601 timestamp containing seconds + string_timestamp = '2021-09-20T10:00:00.000+00:00' + expected_output = '2021-09-20 10:00:00' + assert convert_to_utc_datetime(string_timestamp).strftime('%Y-%m-%d %H:%M:%S') == expected_output + + # Test with a correct ISO 8601 timestamp containing milliseconds + string_timestamp = '2021-09-20T10:00:00.123+00:00' + expected_output = '2021-09-20 10:00:00' + assert convert_to_utc_datetime(string_timestamp).strftime('%Y-%m-%d %H:%M:%S') == expected_output + + # Test with a correct ISO 8601 timestamp containing microseconds + string_timestamp = '2021-09-20T10:00:00.123456+00:00' + expected_output = '2021-09-20 10:00:00' + assert convert_to_utc_datetime(string_timestamp).strftime('%Y-%m-%d %H:%M:%S') == expected_output + + # Test with a correct ISO 8601 timestamp with different timezone - +05:00 + string_timestamp = '2021-09-20T15:00:00.000+05:00' + expected_output = '2021-09-20 10:00:00' + assert convert_to_utc_datetime(string_timestamp).strftime('%Y-%m-%d %H:%M:%S') == expected_output + + # Test with a correct ISO 8601 timestamp with different timezone - -05:00 + string_timestamp = '2021-09-20T05:00:00.000-05:00' + expected_output = '2021-09-20 10:00:00' + assert convert_to_utc_datetime(string_timestamp).strftime('%Y-%m-%d %H:%M:%S') == expected_output + + # Test with output string containing timezone + string_timestamp = '2021-09-20T05:00:00.000-05:00' + expected_output = '2021-09-20 10:00:00 UTC' + assert convert_to_utc_datetime(string_timestamp).strftime('%Y-%m-%d %H:%M:%S %Z') == expected_output + + # Test with None input - Should gracefully fail and return None + string_timestamp = None + expected_output = None + assert convert_to_utc_datetime(string_timestamp) == expected_output + + # Test with YYYY-MM-DD input - Should return a datetime object with the timezone set to UTC + string_timestamp = '2021-09-20' + expected_output = datetime(2021, 9, 20) + assert convert_to_utc_datetime(string_timestamp) == expected_output + + +@dataclass +class TestClass: + __test__ = False + name: str + age: int + + +@pytest.fixture +def simple_example_result() -> Dict[Any, Any]: + return { + "file": { + "created": "2024-01-01 00:00:00 UTC", + "editable": False, + "user": "UABC123" + }, + "user": { + "name": "Joe Bloggs", + "age": 30, + }, + "watchman_id": "abc123" + } + + +@pytest.fixture +def dataclass_example_result_one() -> Dict[Any, Any]: + return { + "file": { + "created": "2024-01-01 00:00:00 UTC", + "editable": False, + "user": "UABC123" + }, + "user": TestClass(name='Joe Bloggs', age=30), + "watchman_id": "abc123" + } + + +@pytest.fixture +def dataclass_example_result_two() -> Dict[Any, Any]: + return { + "match_string": "2840631", + "message": { + "created": "2024-01-01 00:00:00 UTC", + "id": "abcdefghijklmnopqrstuvwxyz", + "permalink": "https://example.com", + "text": "This is a message", + "timestamp": "1729257170.452549", + "type": "message", + "user": TestClass(name='John Smith', age=30) + }, + "watchman_id": "abc1234" + } + + +def test_convert_to_dict(simple_example_result: Dict[Any, Any], + dataclass_example_result_one: Dict[Any, Any]) -> None: + # Test with simple example + assert convert_to_dict(simple_example_result) == simple_example_result + + # Test with dataclass example + assert convert_to_dict(dataclass_example_result_one) == simple_example_result + + +def test_deduplicate_results(simple_example_result: Dict[Any, Any], + dataclass_example_result_one: Dict[Any, Any], + dataclass_example_result_two: Dict[Any, Any]) -> None: + # Test with a single result + assert deduplicate_results([simple_example_result]) == [simple_example_result] + + # Test with multiple results containing duplicates + assert deduplicate_results([simple_example_result, simple_example_result]) == [ + simple_example_result] + + # Test with dataclass example + assert deduplicate_results([dataclass_example_result_one]) == [convert_to_dict(dataclass_example_result_one)] + + # Test with multiple dataclass examples with no duplicates + assert deduplicate_results([dataclass_example_result_one, dataclass_example_result_two]) == [ + convert_to_dict(dataclass_example_result_two), convert_to_dict(dataclass_example_result_one)] + + # Test with multiple dataclass examples with duplicates + assert (deduplicate_results([dataclass_example_result_one, dataclass_example_result_one]) == + [convert_to_dict(dataclass_example_result_one)]) + + +def test_split_to_chunks(): + # Define test cases + test_cases = [ + ([1, 2, 3, 4, 5, 6, 7], 3, [[1, 4, 7], [2, 5], [3, 6]]), + ([1, 2, 3, 4, 5, 6], 2, [[1, 3, 5], [2, 4, 6]]), + ([1, 2, 3, 4], 4, [[1], [2], [3], [4]]), + ([1, 2, 3], 1, [[1, 2, 3]]), + ([], 3, [[], [], []]) # Edge case: empty input list + ] + + for input_list, no_of_chunks, expected in test_cases: + result = list(split_to_chunks(input_list, no_of_chunks)) + assert result == expected, f"Failed for input_list={input_list}, no_of_chunks={no_of_chunks}" \ No newline at end of file