Skip to content

Commit

Permalink
chg: Initial changes to use new annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Jan 17, 2024
1 parent ac0421a commit 0562c63
Show file tree
Hide file tree
Showing 38 changed files with 731 additions and 723 deletions.
16 changes: 13 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
[mypy]
ignore_errors = False

strict = True
warn_return_any = False
show_error_context = True
pretty = True
exclude = pymisp/data|example|docs
exclude = feed-generator|examples

# Stuff to remove gradually
disallow_untyped_defs = False
disallow_untyped_calls = False
check_untyped_defs = False
disable_error_code = attr-defined,type-arg,no-untyped-def


[mypy-docs.source.*]
ignore_errors = True
4 changes: 3 additions & 1 deletion pymisp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
import sys
import warnings
Expand Down Expand Up @@ -59,4 +61,4 @@ def warning_2024():
pass
logger.debug('pymisp loaded properly')
except ImportError as e:
logger.warning('Unable to load pymisp properly: {}'.format(e))
logger.warning(f'Unable to load pymisp properly: {e}')
61 changes: 32 additions & 29 deletions pymisp/abstract.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
#!/usr/bin/env python3

from __future__ import annotations

import logging
from datetime import date, datetime
from deprecated import deprecated # type: ignore
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum
from typing import Union, Optional, Any, Dict, List, Set, Mapping
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache
from pathlib import Path

try:
import orjson # type: ignore
from orjson import loads, dumps # type: ignore
from orjson import loads, dumps
HAS_ORJSON = True
except ImportError:
from json import loads, dumps
Expand All @@ -30,12 +33,12 @@
describe_types = loads(f.read())['result']


class MISPFileCache(object):
class MISPFileCache:
# cache up to 150 JSON structures in class attribute

@staticmethod
@lru_cache(maxsize=150)
def _load_json(path: Path) -> Optional[dict]:
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
with path.open('rb') as f:
Expand Down Expand Up @@ -65,7 +68,7 @@ class Analysis(Enum):
completed = 2


def _int_to_str(d: Dict[str, Any]) -> Dict[str, Any]:
def _int_to_str(d: dict[str, Any]) -> dict[str, Any]:
# transform all integer back to string
for k, v in d.items():
if isinstance(v, dict):
Expand Down Expand Up @@ -94,7 +97,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
__misp_objects_path = misp_objects_path
__describe_types = describe_types

def __init__(self, **kwargs: Dict):
def __init__(self, **kwargs: dict):
"""Abstract class for all the MISP objects.
NOTE: Every method in every classes inheriting this one are doing
changes in memory and do not modify data on a remote MISP instance.
Expand All @@ -103,9 +106,9 @@ def __init__(self, **kwargs: Dict):
"""
super().__init__()
self.__edited: bool = True # As we create a new object, we assume it is edited
self.__not_jsonable: List[str] = []
self._fields_for_feed: Set
self.__self_defined_describe_types: Optional[Dict] = None
self.__not_jsonable: list[str] = []
self._fields_for_feed: set
self.__self_defined_describe_types: dict | None = None
self.uuid: str

if kwargs.get('force_timestamps') is not None:
Expand All @@ -115,13 +118,13 @@ def __init__(self, **kwargs: Dict):
self.__force_timestamps = False

@property
def describe_types(self) -> Dict:
def describe_types(self) -> dict:
if self.__self_defined_describe_types:
return self.__self_defined_describe_types
return self.__describe_types

@describe_types.setter
def describe_types(self, describe_types: Dict):
def describe_types(self, describe_types: dict):
self.__self_defined_describe_types = describe_types

@property
Expand All @@ -133,7 +136,7 @@ def misp_objects_path(self) -> Path:
return self.__misp_objects_path

@misp_objects_path.setter
def misp_objects_path(self, misp_objects_path: Union[str, Path]):
def misp_objects_path(self, misp_objects_path: str | Path):
if isinstance(misp_objects_path, str):
misp_objects_path = Path(misp_objects_path)
self.__misp_objects_path = misp_objects_path
Expand All @@ -155,7 +158,7 @@ def update_not_jsonable(self, *args) -> None:
"""Add entries to the __not_jsonable list"""
self.__not_jsonable += args

def set_not_jsonable(self, args: List[str]) -> None:
def set_not_jsonable(self, args: list[str]) -> None:
"""Set __not_jsonable to a new list"""
self.__not_jsonable = args

Expand All @@ -171,7 +174,7 @@ def from_json(self, json_string: str) -> None:
"""Load a JSON string"""
self.from_dict(**loads(json_string))

def to_dict(self, json_format: bool = False) -> Dict:
def to_dict(self, json_format: bool = False) -> dict:
"""Dump the class to a dictionary.
This method automatically removes the timestamp recursively in every object
that has been edited is order to let MISP update the event accordingly."""
Expand Down Expand Up @@ -213,15 +216,15 @@ def to_dict(self, json_format: bool = False) -> Dict:
to_return = _int_to_str(to_return)
return to_return

def jsonable(self) -> Dict:
def jsonable(self) -> dict:
"""This method is used by the JSON encoder"""
return self.to_dict()

def _to_feed(self) -> Dict:
def _to_feed(self) -> dict:
if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed:
raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.')
if hasattr(self, '_set_default') and callable(self._set_default): # type: ignore
self._set_default() # type: ignore
if hasattr(self, '_set_default') and callable(self._set_default):
self._set_default()
to_return = {}
for field in sorted(self._fields_for_feed):
if getattr(self, field, None) is not None:
Expand All @@ -235,11 +238,11 @@ def _to_feed(self) -> Dict:
if field in ['data', 'first_seen', 'last_seen', 'deleted']:
# special fields
continue
raise PyMISPError('The field {} is required in {} when generating a feed.'.format(field, self.__class__.__name__))
raise PyMISPError(f'The field {field} is required in {self.__class__.__name__} when generating a feed.')
to_return = _int_to_str(to_return)
return to_return

def to_json(self, sort_keys: bool = False, indent: Optional[int] = None) -> str:
def to_json(self, sort_keys: bool = False, indent: int | None = None) -> str:
"""Dump recursively any class of type MISPAbstract to a json string"""
if HAS_ORJSON:
option = 0
Expand Down Expand Up @@ -320,14 +323,14 @@ def __setattr__(self, name: str, value: Any):
self.__edited = True
super().__setattr__(name, value)

def _datetime_to_timestamp(self, d: Union[int, float, str, datetime]) -> int:
def _datetime_to_timestamp(self, d: int | float | str | datetime) -> int:
"""Convert a datetime object to a timestamp (int)"""
if isinstance(d, (int, float, str)):
# Assume we already have a timestamp
return int(d)
return int(d.timestamp())

def _add_tag(self, tag: Optional[Union[str, 'MISPTag', Mapping]] = None, **kwargs):
def _add_tag(self, tag: str | MISPTag | Mapping | None = None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(tag, str):
misp_tag = MISPTag()
Expand All @@ -347,7 +350,7 @@ def _add_tag(self, tag: Optional[Union[str, 'MISPTag', Mapping]] = None, **kwarg
self.edited = True
return misp_tag

def _set_tags(self, tags: List['MISPTag']):
def _set_tags(self, tags: list[MISPTag]):
"""Set a list of prepared MISPTag."""
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
Expand All @@ -363,19 +366,19 @@ def __eq__(self, other) -> bool:
return False

def __repr__(self) -> str:
return '<{self.__class__.__name__} - please define me>'.format(self=self)
return f'<{self.__class__.__name__} - please define me>'


class MISPTag(AbstractMISP):

_fields_for_feed: set = {'name', 'colour', 'relationship_type', 'local'}

def __init__(self, **kwargs: Dict):
def __init__(self, **kwargs: dict):
super().__init__(**kwargs)
self.name: str
self.exportable: bool
self.local: bool
self.relationship_type: Optional[str]
self.relationship_type: str | None

def from_dict(self, **kwargs):
if kwargs.get('Tag'):
Expand All @@ -390,7 +393,7 @@ def _set_default(self):
if not hasattr(self, 'local'):
self.local = False

def _to_feed(self, with_local: bool = True) -> Dict:
def _to_feed(self, with_local: bool = True) -> dict:
if hasattr(self, 'exportable') and not self.exportable:
return {}
if with_local is False and hasattr(self, 'local') and self.local:
Expand All @@ -404,11 +407,11 @@ def delete(self):
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})>'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)>'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)>'


# UUID, datetime, date and Enum is serialized by ORJSON by default
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[Dict, str]:
def pymisp_json_default(obj: AbstractMISP | datetime | date | Enum | UUID) -> dict | str:
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime, date)):
Expand Down
Loading

0 comments on commit 0562c63

Please sign in to comment.