diff --git a/.travis.yml b/.travis.yml index c4c3275..526a009 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,16 @@ python: - "3.3" - "3.4" - "3.5" + - "3.5-dev" + - "3.6-dev" +matrix: + allow_failures: + - python: "3.5-dev" + - python: "3.6-dev" + # command to install dependencies install: # - "pip install -r requirements.txt" - - "pip install setuptools --upgrade; python setup.py install; pip install nose" + - "pip install setuptools --upgrade; pip install -e .[sha3,pyblake2]; pip install nose" # command to run tests -script: nosetests +script: nosetests -vs diff --git a/omnihash/__init__.py b/omnihash/__init__.py index 18b843c..1b0667a 100644 --- a/omnihash/__init__.py +++ b/omnihash/__init__.py @@ -1,5 +1,376 @@ -__version__ = '0.12.1' -__license__ = "MIT License" -__title__ = "omnihash" +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from collections import OrderedDict +import hashlib +import io +import json +import os +import sys + +import click +import pkg_resources +import validators + +import functools as fnt +import itertools as itt + + +from omnihash._version import __version__ +__license__ = "MIT License" +__title__ = "omnihash" __summary__ = "Hash files/strings/streams/network-resources simultaneously in various algorithms." __uri__ = "https://github.com/Miserlou/omnihash" + + +class DigesterFactories(OrderedDict): + """ + Implements the inclusion/exclusion logic for registering *digester-factories*. + + This dict contains pairs like this:: + + {: } + + where a ```` are functions like this:: + + foo(fsize_or_none) -> digester + + A *digester* must support the following methods: + + - ``update(bytes)`` + - ``hexdigest() -> [str, bytes] # case-insensitive`` + + .. Note:: + The *algo-names* must alway be given in UPPER. + + """ + def __init__(self, includes, excludes): + super(DigesterFactories, self).__init__() + self.includes = includes + self.excludes = excludes + + def register_if_accepted(self, algo, factory): + assert algo.isupper(), algo + if self.is_algo_accepted(algo): + self[algo] = factory + + def is_algo_accepted(self, algo): + """ + Invoked by :meth:`register_if_accepted()` or by client BEFORE item-assign, not to create needless dig-factory. + + :param algo: + The UPPER name of the digester to be used as the key in the registry. + """ + assert algo.isupper(), algo + includes = self.includes + excludes = self.excludes + is_included = not includes or any(f in algo for f in includes) + is_excluded = excludes and any(f in algo for f in excludes) + + return is_included and not is_excluded + + +def git_header(otype, fsize): + return ("%s %i\0" % (otype, fsize)).encode() + + +class GitSlurpDigester: + """ + Produce Git-like hashes for bytes without knowing their size a priori. + + Git SHA1-hashes the file-bytes prefixed with the filesize. + So when reading STDIN, we have to slurp the bytes to derive their length, + and hash them afterwards. + + But it's not that we slurp multiple files, just the STDIN once. + """ + + fbytes = b'' + + def __init__(self, otype): + # str + self.otype = otype + + def update(self, fbytes): + self.fbytes += fbytes + + def hexdigest(self): + fsize = len(self.fbytes) + digester = hashlib.sha1(git_header(self.otype, fsize)) + digester.update(self.fbytes) + return digester.hexdigest() + + +def append_git_digesters(digfacts): + """ + Note that contrary to ``git hash-object`` no unix2dos EOL is done! + + :param digfacts: + :type digfacts: DigesterFactories + """ + + def git_factory(otype, fsize): + """If `fsize` is known, chunk-hash file, else it slurps it.""" + if fsize is None: + digester = GitSlurpDigester(otype) + else: + digester = hashlib.sha1(git_header(otype, fsize)) + + return digester + + algo_pairs = (('GIT-%s' % otype.upper(), otype) for otype in 'blob commit tag'.split()) + digfacts.update(('GIT-%s' % otype.upper(), fnt.partial(git_factory, otype)) + for algo, otype in algo_pairs + if digfacts.is_algo_accepted(algo)) + + +class LenDigester: + fsize = 0 + + def __init__(self, fsize): + if fsize is not None: + self.fsize = -fsize + + def update(self, b): + if self.fsize >= 0: + self.fsize += len(b) + + def hexdigest(self): + if self.fsize < 0: + self.fsize = -self.fsize + return str(self.fsize) + + +class FileIter(object): + """An iterator that chunks in bytes a file-descriptor, auto-closing it when exhausted.""" + def __init__(self, fd): + self._fd = fd + self._iter = iter(lambda: fd.read(io.DEFAULT_BUFFER_SIZE), b'') + + def __iter__(self): + return self._iter + + def next(self): + try: + return self._iter.next() + except StopIteration: + self._fd.close() + raise + + +## +# CLI +## + +@click.command() +@click.argument('hashmes', nargs=-1) +@click.option('-s', is_flag=True, default=False, help="Hash input as string, even if there is a file with that name.") +@click.option('-v', is_flag=True, default=False, help="Show version and quit.") +@click.option('-c', is_flag=True, default=False, help="Calculate CRCs as well.") +@click.option('-f', is_flag=False, default=False, multiple=True, + help=("Select a family of algorithms: " + "include only algos having TEXT in their names." + "Use it multiple times to select more families.")) +@click.option('-x', is_flag=False, default=False, multiple=True, + help=("Exclude a family of algorithms: " + "skip algos having TEXT in their names." + "Use it multiple times to exclude more families.")) +@click.option('-m', is_flag=False, default=False, help="Match input string.") +@click.option('-j', is_flag=True, default=False, help="Output result in JSON format.") +@click.pass_context +def main(click_context, hashmes, s, v, c, f, x, m, j): + """ + If there is a file at `hashme`, read and omnihash that. + Otherwise, assume `hashme` is a string. + """ + + # Print version and quit + if v: + version = pkg_resources.require("omnihash")[0].version + click.echo(version) + return + + m = m and m.lower() + digfacts = collect_digester_factories(f, x, c) + + results = [] + if not hashmes: + # If no stdin, just help and quit. + if not sys.stdin.isatty(): + stdin = click.get_binary_stream('stdin') + bytechunks = iter(lambda: stdin.read(io.DEFAULT_BUFFER_SIZE), b'') + if not j: + click.echo("Hashing " + click.style("standard input", bold=True) + "..", err=True) + results.append([produce_hashes(None, bytechunks, digfacts, match=m, use_json=j)]) + else: + print(click_context.get_help()) + return + else: + hash_many = len(hashmes) > 1 + for hashme in hashmes: + result = {} + data = iterate_bytechunks(hashme, s, j, hash_many) + if data: + length, bytechunks = data + result = produce_hashes(length, bytechunks, digfacts, match=m, use_json=j) + if result: + result['NAME'] = hashme + results.append(result) + + if results and j: + print(json.dumps(results, indent=4, sort_keys=True)) + + +## +# Main Logic +## + +def iterate_bytechunks(hashme, is_string, use_json, hash_many): + """ + Return iterable bytes and content-length if possible. + """ + + # URL + if not is_string and validators.url(hashme): + import requests + + if not use_json: + click.echo("Hashing content of URL " + click.style(hashme, bold=True) + "..", err=not hash_many) + try: + response = requests.get(hashme) + except requests.exceptions.ConnectionError as e: + raise ValueError("Not a valid URL. :(") + except Exception as e: + raise ValueError("Not a valid URL. {}.".format(e)) + if response.status_code != 200: + click.echo("Response returned %s. :(" % response.status_code, err=True) + try: + fsize = int(response.headers.get('Content-Length')) + except Exception as ex: + click.echo("[Could not get response-size due to: %s" % ex, err=True) + fsize = None + bytechunks = response.iter_content() + # File + elif os.path.exists(hashme) and not is_string: + if os.path.isdir(hashme): + if not use_json: + click.echo(click.style("Skipping", fg="yellow") + " directory " + "'" + hashme + "'..", err=True) + return None + + if not use_json: + click.echo("Hashing file " + click.style(hashme, bold=True) + "..", err=not hash_many) + fsize = os.stat(hashme).st_size + bytechunks = FileIter(open(hashme, mode='rb')) + # String + else: + if not use_json: + click.echo("Hashing string " + click.style(hashme, bold=True) + "..", err=not hash_many) + bhashme = hashme.encode('utf-8') + fsize = len(bhashme) + bytechunks = (bhashme, ) + + return fsize, bytechunks + + +def append_hashlib_digesters(digfacts): + """Apend python-default digesters.""" + def digester_fact(algo_name, fsize): + # A factory that ignores the `fsize` arg. + return hashlib.new(algo_name) + + algos = sorted(hashlib.algorithms_available) + digfacts.update((algo.upper(), fnt.partial(digester_fact, algo)) + for algo in algos + if algo not in digfacts and digfacts.is_algo_accepted(algo.upper())) + + +def append_crc_digesters(digfacts): + import crcmod.predefined as crcmod + + def digester_fact(crc_name, fsize): + # A factory that ignores the `fsize` arg. + return crcmod.PredefinedCrc(crc_name) + + algos = sorted(rec[0].upper() for rec in crcmod._crc_definitions_table) + digfacts.update((algo, fnt.partial(digester_fact, algo)) + for algo in algos + if digfacts.is_algo_accepted(algo)) + + +def collect_digester_factories(includes, excludes, include_CRCs=False): + """ + Create and return a dictionary of all our active hash algorithms. + + Each digester is a 2-tuple ``( digester.update_func(bytes), digest_func(digester) -> int)``. + """ + from . import plugin + + digfacts = DigesterFactories([i.upper() for i in includes], + [i.upper() for i in excludes]) + + digfacts.register_if_accepted('LENGTH', LenDigester) + append_hashlib_digesters(digfacts) + plugin.append_plugin_digesters(digfacts) + append_git_digesters(digfacts) + if include_CRCs: + append_crc_digesters(digfacts) + + assert all(k.isupper() for k in digfacts.keys()), list(digfacts.keys()) + + return digfacts + + +def produce_hashes(fsize, bytechunks, digfacts, match, use_json=False): + """ + Given our bytes and our algorithms, calculate and print our hashes. + """ + + # Produce hashes + streams = itt.tee(bytechunks, len(digfacts)) + batch = zip(streams, digfacts.items()) + results = {} + + match_found = False + for stream, (algo, fact) in batch: + digester = fact(fsize) + for b in stream: + digester.update(b) + + result = digester.hexdigest() + if isinstance(result, bytes): + result = result.decode() + result = result.lower() + + if match: + if match in result: + echo(algo, result, use_json) + results[algo] = result + match_found = True + else: + results[algo] = result + echo(algo, result, use_json) + + if match: + if not match_found: + if not use_json: + click.echo(click.style("No matches", fg='red') + " found!", err=True) + + return results + + +## +# Util +## + +def echo(algo, digest, json=False): + if not json: + click.echo(' %-*s%s' % (32, click.style(algo, fg='green') + ':', digest)) + +## +# Entrypoint +## + +if __name__ == '__main__': + try: + main() + except ValueError as ex: + echo(ex, err=True) diff --git a/omnihash/_version.py b/omnihash/_version.py new file mode 100644 index 0000000..f8d9095 --- /dev/null +++ b/omnihash/_version.py @@ -0,0 +1 @@ +__version__ = '0.12.1' diff --git a/omnihash/omnihash.py b/omnihash/omnihash.py deleted file mode 100644 index 9054323..0000000 --- a/omnihash/omnihash.py +++ /dev/null @@ -1,323 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# Standard Imports -from collections import OrderedDict -import hashlib -import io -import json -import os -import sys - -import click -import pkg_resources -import requests -import validators - -import crcmod.predefined as crcmod -import itertools as itt - - -## -# Plugins -## -PLUGIN_GROUP_NAME = 'omnihash.plugins' - -known_digesters = OrderedDict() -""" Plugins add here 2-tuples (digester-factory-func, final-hashing-func). """ - - -def intialize_plugins(plugin_group_name=PLUGIN_GROUP_NAME): - entry_points = pkg_resources.working_set.iter_entry_points(plugin_group_name) - for ep in sorted(entry_points, key=lambda ep: ep.name): - try: - plugin_loader = ep.load() - if callable(plugin_loader): - plugin_loader() - except Exception as ex: - click.echo('Failed LOADING plugin(%r@%s) due to: %s' % ( - ep, ep.dist, ex), err=1) - -# Plugin algos -def plugin_sha3_digesters(): - import sha3 # @UnresolvedImport - - known_digesters['SHA3_224'] = (sha3.SHA3224(), lambda d: d.hexdigest().decode("utf-8")) - known_digesters['SHA3_256'] = (sha3.SHA3256(), lambda d: d.hexdigest().decode("utf-8")) - known_digesters['SHA3_384'] = (sha3.SHA3384(), lambda d: d.hexdigest().decode("utf-8")) - known_digesters['SHA3_512'] = (sha3.SHA3512(), lambda d: d.hexdigest().decode("utf-8")) - - -def plugin_pyblake2_digesters(): - import pyblake2 # @UnresolvedImport - - known_digesters['BLAKE2s'] = (pyblake2.blake2s(), lambda d: d.hexdigest()) - known_digesters['BLAKE2b'] = (pyblake2.blake2b(), lambda d: d.hexdigest()) - - -class GitSlurpDigester: - """ - Produce Git-like hashes for bytes without knowing their size a priori. - - Git SHA1-hashes the file-bytes prefixed with the filesize. - So when reading STDIN, we have to slurp the bytes to derive their length, - and hash them afterwards. - - But it's not that we slurp multiple files, just the STDIN once. - """ - - fbytes = b'' - - def __init__(self, otype): - self.otype = otype - - def update(self, fbytes): - self.fbytes += fbytes - - def digest(self): - fsize = len(self.fbytes) - digester = hashlib.sha1(("%s %i\0" % (self.otype, fsize)).encode()) - digester.update(self.fbytes) - return digester.hexdigest() - - -def add_git_digesters(digesters, fpath): - """Note that contrary to ``git hash-object`` no unix2dos EOL is done!""" - try: - fsize = os.stat(fpath).st_size - digesters['GIT-BLOB'] = (hashlib.sha1(b"blob %i\0" % fsize), lambda d: d.hexdigest()) - digesters['GIT-COMMIT'] = (hashlib.sha1(b"commit %i\0" % fsize), lambda d: d.hexdigest()) - digesters['GIT-TAG'] = (hashlib.sha1(b"tag %i\0" % fsize), lambda d: d.hexdigest()) - except: - ## Failback to slurp-digesters `fpath` is not a file. - # - digesters['GIT-BLOB'] = (GitSlurpDigester('blob'), lambda d: d.digest()) - digesters['GIT-COMMIT'] = (GitSlurpDigester('commit'), lambda d: d.digest()) - digesters['GIT-TAG'] = (GitSlurpDigester('tag'), lambda d: d.digest()) - - -## -# Classes -## - -class FileIter(object): - """An iterator that chunks in bytes a file-descriptor, auto-closing it when exhausted.""" - def __init__(self, fd): - self._fd = fd - self._iter = iter(lambda: fd.read(io.DEFAULT_BUFFER_SIZE), b'') - - def __iter__(self): - return self._iter - - def next(self): - try: - return self._iter.next() - except StopIteration: - self._fd.close() - raise - - -class LenDigester: - length = 0 - - def update(self, b): - self.length += len(b) - - def digest(self): - return str(self.length) - -## -# CLI -## - -@click.command() -@click.argument('hashmes', nargs=-1) -@click.option('-s', is_flag=True, default=False, help="Hash input as string, even if there is a file with that name.") -@click.option('-v', is_flag=True, default=False, help="Show version and quit.") -@click.option('-c', is_flag=True, default=False, help="Calculate CRCs as well.") -@click.option('-f', is_flag=False, default=False, multiple=True, - help="Select one or more family of algorithms: " - "include only algos having TEXT (ci) in their names.") -@click.option('-m', is_flag=False, default=False, help="Match input string.") -@click.option('-j', is_flag=True, default=False, help="Output result in JSON format.") -@click.pass_context -def main(click_context, hashmes, s, v, c, f, m, j): - """ - If there is a file at hashme, read and omnihash that file. - Elif hashme is a string, omnihash that. - """ - - # Print version and quit - if v: - version = pkg_resources.require("omnihash")[0].version - click.echo(version) - return - - intialize_plugins() - - results = [] - if not hashmes: - # If no stdin, just help and quit. - if not sys.stdin.isatty(): - digesters = make_digesters(None, f, c) - stdin = click.get_binary_stream('stdin') - bytechunks = iter(lambda: stdin.read(io.DEFAULT_BUFFER_SIZE), b'') - if not j: - click.echo("Hashing " + click.style("standard input", bold=True) + "..", err=True) - results.append([produce_hashes(bytechunks, digesters, match=m, use_json=j)]) - else: - print(click_context.get_help()) - return - else: - hash_many = len(hashmes) > 1 - for hashme in hashmes: - result = {} - digesters = make_digesters(hashme, f, c) - bytechunks = iterate_bytechunks(hashme, s, j, hash_many) - if bytechunks: - result = produce_hashes(bytechunks, digesters, match=m, use_json=j) - if result: - result['NAME'] = hashme - results.append(result) - - if results and j: - print(json.dumps(results, indent=4, sort_keys=True)) - - -## -# Main Logic -## - -def iterate_bytechunks(hashme, is_string, use_json, hash_many): - """ - Prep our bytes. - """ - - # URL - if not is_string and validators.url(hashme): - if not use_json: - click.echo("Hashing content of URL " + click.style(hashme, bold=True) + "..", err=not hash_many) - try: - response = requests.get(hashme) - except requests.exceptions.ConnectionError as e: - raise ValueError("Not a valid URL. :(") - except Exception as e: - raise ValueError("Not a valid URL. {}.".format(e)) - if response.status_code != 200: - click.echo("Response returned %s. :(" % response.status_code, err=True) - bytechunks = response.iter_content() - # File - elif os.path.exists(hashme) and not is_string: - if os.path.isdir(hashme): - if not use_json: - click.echo(click.style("Skipping", fg="yellow") + " directory " + "'" + hashme + "'..", err=True) - return None - - if not use_json: - click.echo("Hashing file " + click.style(hashme, bold=True) + "..", err=not hash_many) - bytechunks = FileIter(open(hashme, mode='rb')) - # String - else: - if not use_json: - click.echo("Hashing string " + click.style(hashme, bold=True) + "..", err=not hash_many) - bytechunks = (hashme.encode('utf-8'), ) - - return bytechunks - - -def make_digesters(fpath, families, include_CRCs=False): - """ - Create and return a dictionary of all our active hash algorithms. - - Each digester is a 2-tuple ``( digester.update_func(bytes), digest_func(digester) -> int)``. - """ - ## TODO: simplify digester-tuple API, ie: (digester, update_func(d), digest_func(d)) - - families = set(f.upper() for f in families) - digesters = OrderedDict() - - digesters['LENGTH'] = (LenDigester(), LenDigester.digest) - - # Default Algos - for algo in sorted(hashlib.algorithms_available): - # algorithms_available can have duplicates - aname = algo.upper() - if aname not in digesters and is_algo_in_families(aname, families): - digesters[aname] = (hashlib.new(algo), lambda d: d.hexdigest()) - - # CRC - if include_CRCs: - for name in sorted(crcmod._crc_definitions_by_name): - crc_name = crcmod._crc_definitions_by_name[name]['name'] - aname = crc_name.upper() - if is_algo_in_families(aname, families): - digesters[aname] = (crcmod.PredefinedCrc(crc_name), - lambda d: hex(d.crcValue)) - - add_git_digesters(digesters, fpath) - - ## Append plugin digesters. - # - digesters.update(known_digesters) - for digester in list(digesters.keys()): - if not is_algo_in_families(digester.upper(), families): - digesters.pop(digester, None) - - return digesters - - -def produce_hashes(bytechunks, digesters, match, use_json=False): - """ - Given our bytes and our algorithms, calculate and print our hashes. - """ - - # Produce hashes - streams = itt.tee(bytechunks, len(digesters)) - batch = zip(streams, digesters.items()) - results = {} - - match_found = False - for stream, (algo, (digester, hashfunc)) in batch: - for b in stream: - digester.update(b) - - result = hashfunc(digester) - if match: - if match in result: - echo(algo, result, use_json) - results[algo] = result - match_found = True - else: - results[algo] = result - echo(algo, result, use_json) - - if match: - if not match_found: - if not use_json: - click.echo(click.style("No matches", fg='red') + " found!", err=True) - - return results - - -## -# Util -## - -def is_algo_in_families(algo_name, families): - """:param algo_name: make sure it is UPPER""" - return not families or any(f in algo_name for f in families) - - -def echo(algo, digest, json=False): - if not json: - click.echo(' %-*s%s' % (32, click.style(algo, fg='green') + ':', digest)) - -## -# Entrypoint -## - -if __name__ == '__main__': - try: - main() - except ValueError as ex: - echo(ex, err=True) diff --git a/omnihash/plugin.py b/omnihash/plugin.py new file mode 100644 index 0000000..4efa4b3 --- /dev/null +++ b/omnihash/plugin.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import click +import pkg_resources + +import functools as fnt + + +## +# Plugins +## +PLUGIN_GROUP_NAME = 'omnihash.plugins' + + +def append_plugin_digesters(digfacts, plugin_group_name=PLUGIN_GROUP_NAME): + """Plugin-loaders accept a :class:`DigesterFactories` instance to register their factory-funcs. """ + entry_points = pkg_resources.working_set.iter_entry_points(plugin_group_name) + entry_points = sorted(entry_points, key=lambda ep: ep.name) + for ep in entry_points: + try: + plugin_loader = ep.load() + if callable(plugin_loader): + plugin_loader(digfacts) + except pkg_resources.DistributionNotFound as ex: + pass + except Exception as ex: + click.echo('Failed LOADING plugin(%r@%s) due to: %s' % ( + ep, ep.dist, ex), err=1) + + +def plugin_sha3_digesters(digfacts): + import sha3 # @UnresolvedImport because it is optional. + + def digester_fact(algo_class, fsize): + # A factory that ignores the `fsize` arg. + return algo_class() + + algo_pairs = ((algo.name.upper(), algo) for algo in (sha3.SHA3224, sha3.SHA3256, sha3.SHA3384, sha3.SHA3512)) + digfacts.update((algo, fnt.partial(digester_fact, cls)) + for algo, cls in algo_pairs + if digfacts.is_algo_accepted(algo)) + + +def plugin_pyblake2_digesters(digfacts): + import pyblake2 # @UnresolvedImport because it is optional. + + def digester_fact(algo_class, fsize): + # A factory that ignores the `fsize` arg. + return algo_class() + + algo_pairs = zip(('BLAKE2S', 'BLAKE2B'), (pyblake2.blake2s, pyblake2.blake2b)) + digfacts.update((algo, fnt.partial(digester_fact, cls)) + for algo, cls in algo_pairs + if digfacts.is_algo_accepted(algo)) diff --git a/setup.py b/setup.py index 0505884..99ff34a 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def read_project_version(): fglobals = {} with io.open(os.path.join( - mydir, 'omnihash', '__init__.py'), encoding='UTF-8') as fd: + mydir, 'omnihash', '_version.py'), encoding='UTF-8') as fd: exec(fd.read(), fglobals) # To read __version__ return fglobals['__version__'] @@ -53,12 +53,12 @@ def read_project_version(): author_email='rich@openwatch.net', entry_points={ 'console_scripts': [ - 'omnihash = omnihash.omnihash:main', - 'oh = omnihash.omnihash:main', + 'omnihash = omnihash.__init__:main', + 'oh = omnihash.__init__:main', ], 'omnihash.plugins': [ - 'a_sha3 = omnihash.omnihash:plugin_sha3_digesters [sha3]', - 'b_pyblake2 = omnihash.omnihash:plugin_pyblake2_digesters [pyblake2]', + 'a_sha3 = omnihash.plugin:plugin_sha3_digesters [sha3]', + 'b_pyblake2 = omnihash.plugin:plugin_pyblake2_digesters [pyblake2]', ], }, extras_require={ diff --git a/tests/test.py b/tests/test.py index d330e4d..e2930c7 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,11 +1,13 @@ -from omnihash.omnihash import main import os +import re import sys import unittest import click from click.testing import CliRunner +import omnihash as oh + def safe_str(obj): try: @@ -32,34 +34,33 @@ def hello(name): # Main def test_empty(self): runner = CliRunner() - result = runner.invoke(main, catch_exceptions=False) - print(result.output) + result = runner.invoke(oh.main, catch_exceptions=False) + #print(result.output) self.assertEqual(result.exit_code, 0) def test_omnihash(self): runner = CliRunner() - result = runner.invoke(main, ['hashme'], catch_exceptions=False) - print(result.output) + result = runner.invoke(oh.main, ['hashme'], catch_exceptions=False) + #print(result.output) self.assertEqual(result.exit_code, 0) self.assertIn('fb78992e561929a6967d5328f49413fa99048d06', result.output) def test_omnihash2(self): runner = CliRunner() - result = runner.invoke(main, ['hashme', 'asdf'], catch_exceptions=False) + result = runner.invoke(oh.main, ['hashme', 'asdf'], catch_exceptions=False) self.assertEqual(result.exit_code, 0) self.assertIn('fb78992e561929a6967d5328f49413fa99048d06', result.output) def test_omnihashfile(self): runner = CliRunner() - result = runner.invoke(main, ['hashme', 'LICENSE'], catch_exceptions=False) + result = runner.invoke(oh.main, ['hashme', 'LICENSE'], catch_exceptions=False) self.assertEqual(result.exit_code, 0) #print(result.output) self.assertIn('941c986ff0f3e90543dc5e2a0687ee99b19bff67', result.output) def test_omnihashfile_conjecutive(self): - import re runner = CliRunner() - result = runner.invoke(main, 'LICENSE LICENSE -f sha1'.split(), catch_exceptions=False) + result = runner.invoke(oh.main, 'LICENSE LICENSE -f sha1'.split(), catch_exceptions=False) self.assertEqual(result.exit_code, 0) #print(result.output) matches = re.findall('941c986ff0f3e90543dc5e2a0687ee99b19bff67', result.output) @@ -71,7 +72,7 @@ def test_omnihashfile_length(self): fpath = 'LICENSE' text = 'hashme' - result = runner.invoke(main, [text, fpath], catch_exceptions=False) + result = runner.invoke(oh.main, [text, fpath], catch_exceptions=False) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r'LENGTH: +%i\D' % len(text)) filelen = os.stat(fpath).st_size @@ -81,13 +82,13 @@ def test_omnihashfile_length(self): def test_omnihashfile_length_zero(self): runner = CliRunner() - result = runner.invoke(main, [''], catch_exceptions=False) + result = runner.invoke(oh.main, [''], catch_exceptions=False) self.assertEqual(result.exit_code, 0) self.assertRegex(result.output, r'LENGTH: +0\D') def test_omnihashf(self): runner = CliRunner() - result = runner.invoke(main, 'Hi -f sha2 -f SHA5'.split(), catch_exceptions=False) + result = runner.invoke(oh.main, 'Hi -f sha2 -f SHA5'.split(), catch_exceptions=False) self.assertEqual(result.exit_code, 0) out = """ SHA224: 7d5104ff2cee331a4586337ea64ab6a188e2b26aecae87227105dae1 @@ -95,61 +96,98 @@ def test_omnihashf(self): SHA512: 45ca55ccaa72b98b86c697fdf73fd364d4815a586f76cd326f1785bb816ff7f1f88b46fb8448b19356ee\ 788eb7d300b9392709a289428070b5810d9b5c2d440d """ - assert result.output.endswith(out) + self.assertIn(out, result.output) - result = runner.invoke(main, 'Hi -c -f sha2 -c -f ITU'.split(), catch_exceptions=False) + result = runner.invoke(oh.main, 'Hi -c -f sha2 -c -f ITU'.split(), catch_exceptions=False) self.assertEqual(result.exit_code, 0) out = """ SHA224: 7d5104ff2cee331a4586337ea64ab6a188e2b26aecae87227105dae1 SHA256: 3639efcd08abb273b1619e82e78c29a7df02c1051b1820e99fc395dcaa3326b8 - CRC-8-ITU: 0xbe + CRC-8-ITU: be """ - print(out) - assert result.output.endswith(out) + #print(out) + self.assertIn(out, result.output) def test_omnihashs(self): runner = CliRunner() - result = runner.invoke(main, ['hashme', 'LICENSE', '-s'], catch_exceptions=False) + result = runner.invoke(oh.main, ['hashme', 'LICENSE', '-s'], catch_exceptions=False) self.assertEqual(result.exit_code, 0) self.assertIn('0398ccd0f49298b10a3d76a47800d2ebecd49859', result.output) def test_omnihashcrc(self): runner = CliRunner() - result = runner.invoke(main, ['hashme', 'README.md', '-sc'], catch_exceptions=False) + result = runner.invoke(oh.main, ['hashme', 'README.md', '-sc'], catch_exceptions=False) self.assertEqual(result.exit_code, 0) - print(result.output) + #print(result.output) self.assertIn('fb78992e561929a6967d5328f49413fa99048d06', result.output) self.assertIn('5d20a7c38be78000', result.output) def test_url(self): runner = CliRunner() - result = runner.invoke(main, ['hashme', 'https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png', '-c']) # noqa + result = runner.invoke(oh.main, ['hashme', + 'https://www.google.com/images/branding/googlelogo/' + '2x/googlelogo_color_272x92dp.png', '-c'], + catch_exceptions=False) self.assertEqual(result.exit_code, 0) - print(result.output) + #print(result.output) self.assertIn('26f471f6ebe3b11557506f6ae96156e0a3852e5b', result.output) self.assertIn('809089', result.output) - result = runner.invoke(main, ['hashme', 'https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png', '-sc']) # noqa + result = runner.invoke(oh.main, ['hashme', 'https://www.google.com/images/branding/googlelogo/' + '2x/googlelogo_color_272x92dp.png', '-sc'], + catch_exceptions=False) self.assertEqual(result.exit_code, 0) - print(result.output) + #print(result.output) self.assertIn('b61bad1cb3dfad6258bef11b12361effebe597a8c80131cd2d6d07fce2206243', result.output) self.assertIn('20d9c2bbdbaf669b', result.output) def test_json(self): runner = CliRunner() - result = runner.invoke(main, ["correct horse battery staple", "-j", "-m", "9cc2"], catch_exceptions=False) + result = runner.invoke(oh.main, ["correct horse battery staple", "-j", "-m", "9cC2"], catch_exceptions=False) self.assertEqual(result.exit_code, 0) - print(result.output) + #print(result.output) self.assertIn('"MD5": "9cc2ae8a1ba7a93da39b46fc1019c481"', result.output) def test_omnihashfile_git(self): runner = CliRunner() - result = runner.invoke(main, 'LICENSE -f git'.split(), catch_exceptions=False) + result = runner.invoke(oh.main, 'LICENSE -f git'.split(), catch_exceptions=False) self.assertEqual(result.exit_code, 0) #print(result.output) self.assertIn('3e108735fcf3efac2b181874a34861a9fb5e7cc1', result.output) self.assertIn('25063c5229e9e558e3207413a1fa56c6262eedc2', result.output) self.assertIn('2c97833c235648e752a00f8ef709fbe2f3523ca4', result.output) + def test_sha3_conjecutive(self): + runner = CliRunner() + result = runner.invoke(oh.main, 'hashme hashme -f sha3_'.split(), catch_exceptions=False) + self.assertEqual(result.exit_code, 0) + if 'SHA3_' not in result.output: + return # SHA3 not installed. + + self.assertEqual(len(re.findall('d1d3e0dafeecb8536c608305715380396486d0566fdca5e104e469c6', + result.output)), 2, 'SHA3_224' + result.output) + self.assertEqual(len(re.findall('80d3abe0d26ba5f08e231bb7787b1df7c007df6d4490e52654bf8566abcea81f', + result.output)), 2, 'SHA3_256' + result.output) + self.assertEqual(len(re.findall('d1d3e0dafeecb8536c608305715380396486d0566fdca5e104e469c6', + result.output)), 2, 'SHA3_384' + result.output) + self.assertEqual(len(re.findall('80d3abe0d26ba5f08e231bb7787b1df7c007df6d4490e52654bf8566abcea81f', + result.output)), 2, 'SHA3_512' + result.output) + + def test_blake2_conjecutive(self): + runner = CliRunner() + result = runner.invoke(oh.main, 'hashme hashme -f BLAKE2'.split(), catch_exceptions=False) + self.assertEqual(result.exit_code, 0) + if 'BLAKE2' not in result.output: + return # BLAKE2 not installed. + + ## NOTE: PY352+ added also BLAKE2 algos, + # so check matches >= 2. + # + self.assertGreaterEqual(len(re.findall('4bb3e5bffb04cd659f791cd4d36cf3f31c0950c916402a871d47e180f47491e8', + result.output)), 2, 'BLAKE2s' + result.output) + self.assertGreaterEqual(len(re.findall('827d2797e521f0bff107cabe1babe0860e4c0ab43dd06476b970cbe2711702bc0' + '99534b8dfa13df74fab8548eedea26763d0f4c3879c4fe514acb0eda69eb68a', + result.output)), 2, 'BLAKE2b' + result.output) + if __name__ == '__main__': unittest.main()