Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/python_run_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Run Unit Test via Pytest

on:
push:

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry
poetry install --with dev
- name: Analysing the code with pylint
run: |
poetry run pylint $(git ls-files '*.py')
continue-on-error: true
- name: Test with pytest
run: |
poetry run coverage run -m pytest -v -s
- name: Generate Coverage Report
run: |
poetry run coverage report -m
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
### Added
- Signatures now loaded into memory instead of being saved to disk. This allows for running on read-only filesystems.
- Tests for Docker build
- Enhanced deduplication of findings
- The same match should not be returned multiple times within the same scope. E.g. if a token is found in a commit, it should not be returned multiple times in the same commit.
- All dates are now converted and logged in UTC
- Unit tests added for models and utils

### Fixed
- Error when searching wiki-blobs
Expand Down
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ colorama = "^0.4.6"
pyyaml = "^6.0.2"
requests = "^2.32.3"
python-gitlab = "^5.0.0"
pytz = "^2024.2"

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.3"
Expand Down
146 changes: 78 additions & 68 deletions src/gitlab_watchman/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
import argparse
import calendar
import datetime
import multiprocessing
import os
import sys
import time
import datetime
import traceback
from dataclasses import dataclass
from importlib import metadata
from typing import List

from gitlab_watchman import watchman_processor
from gitlab_watchman.clients.gitlab_client import GitLabAPIClient
from gitlab_watchman.signature_downloader import SignatureDownloader
from gitlab_watchman.loggers import JSONLogger, StdoutLogger, log_to_csv
from gitlab_watchman.models import (
signature,
user,
project,
group
)
from gitlab_watchman.exceptions import (
GitLabWatchmanError,
GitLabWatchmanGetObjectError,
Expand All @@ -26,35 +21,55 @@
ElasticsearchMissingError,
MissingEnvVarError
)
from gitlab_watchman.clients.gitlab_client import GitLabAPIClient
from gitlab_watchman.loggers import (
JSONLogger,
StdoutLogger,
log_to_csv,
init_logger
)
from gitlab_watchman.models import (
signature,
user,
project,
group
)


@dataclass
class SearchArgs:
""" Dataclass to hold search arguments """
gitlab_client: GitLabAPIClient
sig_list: List[signature.Signature]
timeframe: int
logging_type: str
log_handler: JSONLogger | StdoutLogger
debug: bool
verbose: bool
scopes: List[str]

def search(gitlab_connection: GitLabAPIClient,
sig: signature.Signature,
timeframe: int,
scope: str,
verbose: bool):

def search(search_args: SearchArgs, sig: signature.Signature, scope: str):
""" Use the appropriate search function to search GitLab based on the contents
of the signature file. Output results to stdout

Args:
gitlab_connection: GitLab API object
search_args: SearchArgs object
sig: Signature object
timeframe: Timeframe to search for
scope: What sort of GitLab objects to search
verbose: Whether to use verbose logging or not
"""

try:
OUTPUT_LOGGER.log('INFO', f'Searching for {sig.name} in {scope}')

results = watchman_processor.search(
gitlab=gitlab_connection,
log_handler=OUTPUT_LOGGER,
gitlab=search_args.gitlab_client,
logging_type=search_args.logging_type,
log_handler=search_args.log_handler,
debug=search_args.debug,
sig=sig,
scope=scope,
verbose=verbose,
timeframe=timeframe)
verbose=search_args.verbose,
timeframe=search_args.timeframe)
if results:
for log_data in results:
OUTPUT_LOGGER.log(
Expand All @@ -71,41 +86,18 @@ def search(gitlab_connection: GitLabAPIClient,
raise e


def perform_search(gitlab_connection: GitLabAPIClient,
sig_list: List[signature.Signature],
timeframe: int,
verbose_logging: bool,
scopes: List[str]):
def perform_search(search_args: SearchArgs):
""" Helper function to perform the search for each signature and each scope

Args:
gitlab_connection: GitLab API object
sig_list: List of Signature objects
timeframe: Timeframe to search for
verbose_logging: Whether to use verbose logging or not
scopes: List of scopes to search
search_args: SearchArgs object
"""

for sig in sig_list:
for sig in search_args.sig_list:
if sig.scope:
for scope in scopes:
for scope in search_args.scopes:
if scope in sig.scope:
search(gitlab_connection, sig, timeframe, scope, verbose_logging)


def init_logger(logging_type: str, debug: bool) -> JSONLogger | StdoutLogger:
""" Create a logger object. Defaults to stdout if no option is given

Args:
logging_type: Type of logging to use
debug: Whether to use debug level logging or not
Returns:
Logger object
"""

if not logging_type or logging_type == 'stdout':
return StdoutLogger(debug=debug)
return JSONLogger(debug=debug)
search(search_args, sig, scope)


def validate_variables() -> bool:
Expand Down Expand Up @@ -261,44 +253,62 @@ def main():
'SUCCESS',
f'Projects output to CSV file: {os.path.join(os.getcwd(), "gitlab_projects.csv")}')

search_args = SearchArgs(
gitlab_client=gitlab_client,
sig_list=signature_list,
timeframe=timeframe,
logging_type=logging_type,
log_handler=OUTPUT_LOGGER,
debug=debug,
verbose=verbose,
scopes=[])

if everything:
OUTPUT_LOGGER.log('INFO', 'Getting everything...')
perform_search(gitlab_client, signature_list, timeframe, verbose,
[
'blobs',
'commits',
'issues',
'merge_requests',
'wiki_blobs',
'milestones',
'notes',
'snippet_titles'
])
search_args.scopes = [
'blobs',
'commits',
'issues',
'merge_requests',
'wiki_blobs',
'milestones',
'notes',
'snippet_titles'
]
perform_search(search_args)
else:
if blobs:
OUTPUT_LOGGER.log('INFO', 'Searching blobs')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['blobs'])
search_args.scopes = ['blobs']
perform_search(search_args)
if commits:
OUTPUT_LOGGER.log('INFO', 'Searching commits')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['commits'])
search_args.scopes = ['commits']
perform_search(search_args)
if issues:
OUTPUT_LOGGER.log('INFO', 'Searching issues')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['issues'])
search_args.scopes = ['issues']
perform_search(search_args)
if merge:
OUTPUT_LOGGER.log('INFO', 'Searching merge requests')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['merge_requests'])
search_args.scopes = ['merge_requests']
perform_search(search_args)
if wiki:
OUTPUT_LOGGER.log('INFO', 'Searching wiki blobs')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['wiki_blobs'])
search_args.scopes = ['wiki_blobs']
perform_search(search_args)
if milestones:
OUTPUT_LOGGER.log('INFO', 'Searching milestones')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['milestones'])
search_args.scopes = ['milestones']
perform_search(search_args)
if notes:
OUTPUT_LOGGER.log('INFO', 'Searching notes')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['notes'])
search_args.scopes = ['notes']
perform_search(search_args)
if snippets:
OUTPUT_LOGGER.log('INFO', 'Searching snippets')
perform_search(gitlab_client, signature_list, timeframe, verbose, ['snippet_titles'])
search_args.scopes = ['snippet_titles']
perform_search(search_args)

OUTPUT_LOGGER.log('SUCCESS', f'GitLab Watchman finished execution - Execution time:'
f' {str(datetime.timedelta(seconds=time.time() - start_time))}')
Expand Down
30 changes: 21 additions & 9 deletions src/gitlab_watchman/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from typing import Any, Dict, List, ClassVar, Protocol
from colorama import Fore, Back, Style, init

from gitlab_watchman.utils import EnhancedJSONEncoder


class StdoutLogger:
def __init__(self, **kwargs):
Expand Down Expand Up @@ -102,9 +104,11 @@ def log(self,
f' -----'
elif scope == 'wiki_blobs':
if message.get('project_wiki'):
wiki_path = f'{message.get("project").get("web_url")}/-/wikis/{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}'
wiki_path = (f'{message.get("project").get("web_url")}/-/wikis/'
f'{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}')
elif message.get('group_wiki'):
wiki_path = f'{message.get("group").get("web_url")}/-/wikis/{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}'
wiki_path = (f'{message.get("group").get("web_url")}/-/wikis/'
f'{urllib.parse.quote_plus(message.get("wiki_blob").get("path"))}')
else:
wiki_path = 'N/A'

Expand Down Expand Up @@ -260,13 +264,6 @@ def print_header(self) -> None:
print(' '.ljust(79) + Fore.GREEN)


class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)


class JSONLogger(Logger):
def __init__(self, name: str = 'gitlab_watchman', **kwargs):
super().__init__(name)
Expand Down Expand Up @@ -349,3 +346,18 @@ def log_to_csv(csv_name: str, export_data: List[IsDataclass]) -> None:
f.close()
except Exception as e:
print(e)


def init_logger(logging_type: str, debug: bool) -> JSONLogger | StdoutLogger:
""" Create a logger object. Defaults to stdout if no option is given

Args:
logging_type: Type of logging to use
debug: Whether to use debug level logging or not
Returns:
Logger object
"""

if not logging_type or logging_type == 'stdout':
return StdoutLogger(debug=debug)
return JSONLogger(debug=debug)
Loading