diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..bf620e1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[run] +omit = + */_version.py + */__init__.py diff --git a/.gitignore b/.gitignore index 633bd66..1854034 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ *~ .noseids .tox +.eggs +.coverage build/ dist/ doc/__build/* @@ -11,3 +13,4 @@ doc/__build/* *_rsa.pub locale/ pip-log.txt +MANIFEST diff --git a/.travis.yml b/.travis.yml index 893e0b0..d4aba40 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,14 @@ language: python python: - "2.7" - - "3.2" - "3.3" - "3.4" + - "3.5" + - "3.6" install: - pip install . - - pip install nose -script: nosetests + - pip install coveralls +script: + coverage run --source=httpsig setup.py test +after_success: + coveralls diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f78e213..20639b5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,16 @@ httpsig Changes --------------- +1.1.3 (2017-July) +------------------- + +* Support IEFT ``draft-00`` to ``draft-07`` +* Python version update: support ``py3.5``, ``py3.6`` and remove ``py3.2`` (due to coverage not supported) +* Python re-format by using linter. +* Complete unittest and enhance coverage to over 90%. +* Bugfix for python3 string encode and decode. + + 1.1.2 (2015-Feb-11) ------------------- diff --git a/MANIFEST b/MANIFEST deleted file mode 100644 index d969ca8..0000000 --- a/MANIFEST +++ /dev/null @@ -1,20 +0,0 @@ -# file GENERATED by distutils, do NOT edit -CHANGELOG.rst -LICENSE.txt -README.rst -requirements.txt -setup.cfg -setup.py -versioneer.py -httpsig/__init__.py -httpsig/_version.py -httpsig/requests_auth.py -httpsig/sign.py -httpsig/utils.py -httpsig/verify.py -httpsig/tests/__init__.py -httpsig/tests/rsa_private.pem -httpsig/tests/rsa_public.pem -httpsig/tests/test_signature.py -httpsig/tests/test_utils.py -httpsig/tests/test_verify.py diff --git a/MANIFEST.in b/MANIFEST.in index 20b80ef..6226efd 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -2,4 +2,3 @@ include *.rst include *.txt include versioneer.py include httpsig/_version.py -include httpsig/tests/*.pem diff --git a/README.rst b/README.rst index ca0674a..69905f1 100644 --- a/README.rst +++ b/README.rst @@ -3,11 +3,11 @@ httpsig .. image:: https://travis-ci.org/ahknight/httpsig.svg?branch=master :target: https://travis-ci.org/ahknight/httpsig - + .. image:: https://travis-ci.org/ahknight/httpsig.svg?branch=develop :target: https://travis-ci.org/ahknight/httpsig -Sign HTTP requests with secure signatures according to the IETF HTTP Signatures specification (`Draft 3`_). This is a fork of the original module_ to fully support both RSA and HMAC schemes as well as unit test both schemes to prove they work. It's being used in production and is actively-developed. +Sign HTTP requests with secure signatures according to the IETF HTTP Signatures specification from draft 00 to 07. This is a fork of the original module_ to fully support both RSA and HMAC schemes as well as unit test both schemes to prove they work. It's being used in production and is actively-developed. See the original project_, original Python module_, original spec_, and `current IETF draft`_ for more details on the signing scheme. @@ -15,12 +15,11 @@ See the original project_, original Python module_, original spec_, and `current .. _module: https://github.com/zzsnzmn/py-http-signature .. _spec: https://github.com/joyent/node-http-signature/blob/master/http_signing.md .. _`current IETF draft`: https://datatracker.ietf.org/doc/draft-cavage-http-signatures/ -.. _`Draft 3`: http://tools.ietf.org/html/draft-cavage-http-signatures-03 Requirements ------------ -* Python 2.7, 3.2, 3.3, 3.4 +* Python 2.7, 3.3, 3.4, 3.5, 3.6 * PyCrypto_ Optional: @@ -40,23 +39,23 @@ For simple raw signing: .. code:: python import httpsig - + secret = open('rsa_private.pem', 'rb').read() - + sig_maker = httpsig.Signer(secret=secret, algorithm='rsa-sha256') sig_maker.sign('hello world!') For general use with web frameworks: - + .. code:: python import httpsig - - key_id = "Some Key ID" - secret = b'some big secret' - - hs = httpsig.HeaderSigner(key_id, secret, algorithm="hmac-sha256", headers=['(request-target)', 'host', 'date']) - signed_headers_dict = hs.sign({"Date": "Tue, 01 Jan 2014 01:01:01 GMT", "Host": "example.com"}, method="GET", path="/api/1/object/1") + + key_id = 'some key ID' + secret = 'some big secret' + + hs = httpsig.HeaderSigner(key_id, secret, algorithm='hmac-sha256', headers=['(request-target)', 'host', 'date']) + signed_headers_dict = hs.sign({'Date': 'Tue, 01 Jan 2014 01:01:01 GMT', 'Host': 'example.com'}, method='GET', path='/api/1/object/1') For use with requests: @@ -65,11 +64,11 @@ For use with requests: import json import requests from httpsig.requests_auth import HTTPSignatureAuth - + secret = open('rsa_private.pem', 'rb').read() - + auth = HTTPSignatureAuth(key_id='Test', secret=secret) - z = requests.get('https://api.example.com/path/to/endpoint', + z = requests.get('https://api.example.com/path/to/endpoint', auth=auth, headers={'X-Api-Version': '~6.5'}) Class initialization parameters @@ -81,18 +80,19 @@ Note that keys and secrets should be bytes objects. At attempt will be made to httpsig.Signer(secret, algorithm='rsa-sha256') -``secret``, in the case of an RSA signature, is a string containing private RSA pem. In the case of HMAC, it is a secret password. -``algorithm`` is one of the six allowed signatures: ``rsa-sha1``, ``rsa-sha256``, ``rsa-sha512``, ``hmac-sha1``, ``hmac-sha256``, -``hmac-sha512``. +:secret: in the case of an RSA signature, is a string containing private RSA pem. In the case of HMAC, it is a secret password. +:algorithm: is one of the six allowed hash-sign algorithm combinations: ``rsa-sha1``, ``rsa-sha256``, ``rsa-sha512``, ``hmac-sha1``, ``hmac-sha256``, ``hmac-sha512``. .. code:: python - httpsig.requests_auth.HTTPSignatureAuth(key_id, secret, algorithm='rsa-sha256', headers=None) + httpsig.requests_auth.HTTPSignatureAuth(key_id, secret, algorithm='rsa-sha256', headers=None, httpsig_version=None) -``key_id`` is the label by which the server system knows your RSA signature or password. -``headers`` is the list of HTTP headers that are concatenated and used as signing objects. By default it is the specification's minimum, the ``Date`` HTTP header. -``secret`` and ``algorithm`` are as above. +:key_id: is the label by which the server system knows your RSA signature or password. +:headers: is the list of HTTP headers that are concatenated and used as signing objects. By default it is the specification's minimum, the ``Date`` HTTP header. +:httpsig_version: is the IEFT version. By default it is ``draft-07`` and allowed: ``draft-00`` to ``draft-07``. +:secret: as above. +:algorithm: as above. Tests ----- diff --git a/httpsig/_version.py b/httpsig/_version.py index b1a0acd..f511b75 100644 --- a/httpsig/_version.py +++ b/httpsig/_version.py @@ -185,4 +185,3 @@ def get_versions(default={"version": "unknown", "full": ""}, verbose=False): return (versions_from_vcs(tag_prefix, root, verbose) or versions_from_parentdir(parentdir_prefix, root, verbose) or default) - diff --git a/httpsig/requests_auth.py b/httpsig/requests_auth.py index 6a02896..1ddee49 100644 --- a/httpsig/requests_auth.py +++ b/httpsig/requests_auth.py @@ -2,7 +2,7 @@ try: # Python 3 from urllib.parse import urlparse -except ImportError: +except ImportError: # pragma: no cover # Python 2 from urlparse import urlparse @@ -19,19 +19,23 @@ class HTTPSignatureAuth(AuthBase): algorithm is one of the six specified algorithms headers is a list of http headers to be included in the signing string, defaulting to "Date" alone. ''' - def __init__(self, key_id='', secret='', algorithm=None, headers=None): + def __init__(self, key_id='', secret='', algorithm=None, headers=None, httpsig_version=None): headers = headers or [] - self.header_signer = HeaderSigner(key_id=key_id, secret=secret, - algorithm=algorithm, headers=headers) + self.header_signer = HeaderSigner( + key_id=key_id, + secret=secret, + algorithm=algorithm, + headers=headers, + httpsig_version=httpsig_version) self.uses_host = 'host' in [h.lower() for h in headers] - def __call__(self, r): + def __call__(self, request): headers = self.header_signer.sign( - r.headers, - # 'Host' header unavailable in request object at this point - # if 'host' header is needed, extract it from the url - host=urlparse(r.url).netloc if self.uses_host else None, - method=r.method, - path=r.path_url) - r.headers.update(headers) - return r + request.headers, + # 'Host' header unavailable in request object at this point + # if 'host' header is needed, extract it from the url + host=urlparse(request.url).netloc if self.uses_host else None, + method=request.method, + path=request.path_url) + request.headers.update(headers) + return request diff --git a/httpsig/sign.py b/httpsig/sign.py index 7125035..990e263 100644 --- a/httpsig/sign.py +++ b/httpsig/sign.py @@ -5,38 +5,38 @@ from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 -from .utils import * +from .utils import ALGORITHMS, HASHES, HttpSigException, build_signature_template, CaseInsensitiveDict, generate_message -DEFAULT_SIGN_ALGORITHM = "hmac-sha256" +DEFAULT_SIGN_ALGORITHM = 'hmac-sha256' +DEFAULT_HTTPSIG_VERSION = 'draft-07' +SUPPORTED_HTTPSIG_VERSION = ['draft-00', 'draft-01', 'draft-02', 'draft-03', 'draft-04', 'draft-05', 'draft-06', 'draft-07'] class Signer(object): """ When using an RSA algo, the secret is a PEM-encoded private key. When using an HMAC algo, the secret is the HMAC signing secret. - + Password-protected keyfiles are not supported. """ - def __init__(self, secret, algorithm=None): - if algorithm is None: - algorithm = DEFAULT_SIGN_ALGORITHM - - assert algorithm in ALGORITHMS, "Unknown algorithm" - if isinstance(secret, six.string_types): secret = secret.encode("ascii") - + def __init__(self, secret, algorithm=DEFAULT_SIGN_ALGORITHM): + assert algorithm in ALGORITHMS, 'Unknown algorithm' + if isinstance(secret, six.string_types): + secret = secret.encode('ascii') + self._rsa = None self._hash = None self.sign_algorithm, self.hash_algorithm = algorithm.split('-') - + if self.sign_algorithm == 'rsa': try: rsa_key = RSA.importKey(secret) self._rsa = PKCS1_v1_5.new(rsa_key) self._hash = HASHES[self.hash_algorithm] except ValueError: - raise HttpSigException("Invalid key.") - + raise HttpSigException('Invalid key.') + elif self.sign_algorithm == 'hmac': self._hash = HMAC.new(secret, digestmod=HASHES[self.hash_algorithm]) @@ -44,29 +44,32 @@ def __init__(self, secret, algorithm=None): def algorithm(self): return '%s-%s' % (self.sign_algorithm, self.hash_algorithm) - def _sign_rsa(self, data): - if isinstance(data, six.string_types): data = data.encode("ascii") - h = self._hash.new() - h.update(data) - return self._rsa.sign(h) + def __sign_rsa(self, data): + _hash = self._hash.new() + _hash.update(data) + return self._rsa.sign(_hash) - def _sign_hmac(self, data): - if isinstance(data, six.string_types): data = data.encode("ascii") + def __sign_hmac(self, data): hmac = self._hash.copy() hmac.update(data) return hmac.digest() def _sign(self, data): - if isinstance(data, six.string_types): data = data.encode("ascii") + '''return raw signed string''' signed = None if self._rsa: - signed = self._sign_rsa(data) + signed = self.__sign_rsa(data) elif self._hash: - signed = self._sign_hmac(data) + signed = self.__sign_hmac(data) if not signed: raise SystemError('No valid encryptor found.') - return base64.b64encode(signed).decode("ascii") + return signed + def sign(self, data): + if isinstance(data, six.string_types): + data = data.encode('ascii') + + return base64.b64encode(self._sign(data)).decode('ascii') class HeaderSigner(Signer): ''' @@ -78,13 +81,15 @@ class HeaderSigner(Signer): :arg algorithm: one of the six specified algorithms :arg headers: a list of http headers to be included in the signing string, defaulting to ['date']. ''' - def __init__(self, key_id, secret, algorithm=None, headers=None): - if algorithm is None: - algorithm = DEFAULT_SIGN_ALGORITHM - + def __init__(self, key_id, secret, algorithm=DEFAULT_SIGN_ALGORITHM, headers=None, httpsig_version=DEFAULT_HTTPSIG_VERSION): super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm) self.headers = headers or ['date'] self.signature_template = build_signature_template(key_id, algorithm, headers) + if httpsig_version in SUPPORTED_HTTPSIG_VERSION: + self.httpsig_version = httpsig_version + else: + self.httpsig_version = DEFAULT_HTTPSIG_VERSION + self._verify_headers_by_draft_version() def sign(self, headers, host=None, method=None, path=None): """ @@ -96,11 +101,21 @@ def sign(self, headers, host=None, method=None, path=None): path is the HTTP path (required when using '(request-target)'). """ headers = CaseInsensitiveDict(headers) - required_headers = self.headers or ['date'] - signable = generate_message(required_headers, headers, host, method, path) - - signature = self._sign(signable) + signable = generate_message(self.headers, headers, host, method, path) + + signature = super(HeaderSigner, self).sign(signable) headers['authorization'] = self.signature_template % signature - + return headers + def _verify_headers_by_draft_version(self): + HEADER_MAPING = { + 'request-line': ['draft-00', 'draft-01'], + '(request-line)': ['draft-02'], + '(request-target)': ['draft-03', 'draft-04', 'draft-05', 'draft-06', 'draft-07'] + } + + for header, drafts in HEADER_MAPING.items(): + if header in self.headers: + if self.httpsig_version not in drafts: + raise KeyError('%s is not supported by %s' % (header, self.httpsig_version)) diff --git a/httpsig/tests/__init__.py b/httpsig/tests/__init__.py deleted file mode 100644 index 72d4383..0000000 --- a/httpsig/tests/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .test_signature import * -from .test_utils import * -from .test_verify import * \ No newline at end of file diff --git a/httpsig/tests/test_signature.py b/httpsig/tests/test_signature.py deleted file mode 100755 index 00ed29d..0000000 --- a/httpsig/tests/test_signature.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -import json -import unittest - -import httpsig.sign as sign -from httpsig.utils import parse_authorization_header - - -sign.DEFAULT_SIGN_ALGORITHM = "rsa-sha256" - - -class TestSign(unittest.TestCase): - - def setUp(self): - self.key_path = os.path.join(os.path.dirname(__file__), 'rsa_private.pem') - with open(self.key_path, 'rb') as f: - self.key = f.read() - - def test_default(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key) - unsigned = { - 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT' - } - signed = hs.sign(unsigned) - self.assertIn('Date', signed) - self.assertEqual(unsigned['Date'], signed['Date']) - self.assertIn('Authorization', signed) - auth = parse_authorization_header(signed['authorization']) - params = auth[1] - self.assertIn('keyId', params) - self.assertIn('algorithm', params) - self.assertIn('signature', params) - self.assertEqual(params['keyId'], 'Test') - self.assertEqual(params['algorithm'], 'rsa-sha256') - self.assertEqual(params['signature'], 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=') - - def test_all(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key, headers=[ - '(request-target)', - 'host', - 'date', - 'content-type', - 'content-md5', - 'content-length' - ]) - unsigned = { - 'Host': 'example.com', - 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT', - 'Content-Type': 'application/json', - 'Content-MD5': 'Sd/dVLAcvNLSq16eXua5uQ==', - 'Content-Length': '18', - } - signed = hs.sign(unsigned, method='POST', path='/foo?param=value&pet=dog') - - self.assertIn('Date', signed) - self.assertEqual(unsigned['Date'], signed['Date']) - self.assertIn('Authorization', signed) - auth = parse_authorization_header(signed['authorization']) - params = auth[1] - self.assertIn('keyId', params) - self.assertIn('algorithm', params) - self.assertIn('signature', params) - self.assertEqual(params['keyId'], 'Test') - self.assertEqual(params['algorithm'], 'rsa-sha256') - self.assertEqual(params['headers'], '(request-target) host date content-type content-md5 content-length') - self.assertEqual(params['signature'], 'G8/Uh6BBDaqldRi3VfFfklHSFoq8CMt5NUZiepq0q66e+fS3Up3BmXn0NbUnr3L1WgAAZGplifRAJqp2LgeZ5gXNk6UX9zV3hw5BERLWscWXlwX/dvHQES27lGRCvyFv3djHP6Plfd5mhPWRkmjnvqeOOSS0lZJYFYHJz994s6w=') diff --git a/httpsig/tests/test_utils.py b/httpsig/tests/test_utils.py deleted file mode 100755 index 6d79f69..0000000 --- a/httpsig/tests/test_utils.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python -import os -import re -import sys -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -import unittest - -from httpsig.utils import get_fingerprint - -class TestUtils(unittest.TestCase): - - def test_get_fingerprint(self): - with open(os.path.join(os.path.dirname(__file__), 'rsa_public.pem'), 'r') as k: - key = k.read() - fingerprint = get_fingerprint(key) - self.assertEqual(fingerprint, "73:61:a2:21:67:e0:df:be:7e:4b:93:1e:15:98:a5:b7") diff --git a/httpsig/utils.py b/httpsig/utils.py index b34e3fa..d004e8e 100644 --- a/httpsig/utils.py +++ b/httpsig/utils.py @@ -1,21 +1,20 @@ import re -import struct import hashlib import base64 +from functools import reduce import six try: # Python 3 from urllib.request import parse_http_list -except ImportError: +except ImportError: # pragma: no cover # Python 2 from urllib2 import parse_http_list -from Crypto.PublicKey import RSA from Crypto.Hash import SHA, SHA256, SHA512 ALGORITHMS = frozenset(['rsa-sha1', 'rsa-sha256', 'rsa-sha512', 'hmac-sha1', 'hmac-sha256', 'hmac-sha512']) -HASHES = {'sha1': SHA, +HASHES = {'sha1': SHA, 'sha256': SHA256, 'sha512': SHA512} @@ -23,95 +22,99 @@ class HttpSigException(Exception): pass -""" -Constant-time string compare. -http://codahale.com/a-lesson-in-timing-attacks/ -""" -def ct_bytes_compare(a, b): - if not isinstance(a, six.binary_type): - a = a.decode('utf8') - if not isinstance(b, six.binary_type): - b = b.decode('utf8') - - if len(a) != len(b): + +def ct_bytes_compare(byte_l, byte_r): + """ + Constant-time string compare. + http://codahale.com/a-lesson-in-timing-attacks/ + """ + if not isinstance(byte_l, six.binary_type): + byte_l = byte_l.encode('utf8') + if not isinstance(byte_r, six.binary_type): + byte_r = byte_r.encode('utf8') + + if len(byte_l) != len(byte_r): return False - result = 0 - for x, y in zip(a, b): - if six.PY2: - result |= ord(x) ^ ord(y) - else: - result |= x ^ y - - return (result == 0) + result = reduce(lambda r, b: r | (b[0] ^ b[1]), map(lambda b: (ord(b[0]), ord(b[1])) if six.PY2 else b, zip(byte_l, byte_r)), 0) + + return result == 0 -def generate_message(required_headers, headers, host=None, method=None, path=None): + +def generate_message(required_headers, headers, host=None, method=None, path=None, http_version=None): headers = CaseInsensitiveDict(headers) - + if not required_headers: required_headers = ['date'] - + signable_list = [] - for h in required_headers: - h = h.lower() - if h == '(request-target)': + for header in required_headers: + header = header.lower() + if header == '(request-target)': # draft-03 to draft-07 if not method or not path: - raise Exception('method and path arguments required when using "(request-target)"') - signable_list.append('%s: %s %s' % (h, method.lower(), path)) - - elif h == 'host': + raise KeyError('method and path arguments required when using "(request-target)"') + signable_list.append('%s: %s %s' % (header, method.lower(), path)) + elif header == '(request-line)': # draft-02 + if not method or not path: + raise KeyError('method and path arguments required when using "(request-line)"') + signable_list.append('%s: %s %s' % (header, method.lower(), path)) + elif header == 'request-line': # draft-00, draft-01 + if not method or not path or not http_version: + raise KeyError('method, path and http_version arguments required when using "request-line"') + signable_list.append('%s %s %s' % (method, path, http_version)) + + elif header == 'host': # 'host' special case due to requests lib restrictions # 'host' is not available when adding auth so must use a param # if no param used, defaults back to the 'host' header if not host: if 'host' in headers: - host = headers[h] + host = headers[header] else: - raise Exception('missing required header "%s"' % (h)) - signable_list.append('%s: %s' % (h, host)) + raise KeyError('missing required header "%s"' % (header)) + signable_list.append('%s: %s' % (header, host)) else: - if h not in headers: - raise Exception('missing required header "%s"' % (h)) + if header not in headers: + raise KeyError('missing required header "%s"' % (header)) - signable_list.append('%s: %s' % (h, headers[h])) + signable_list.append('%s: %s' % (header, headers[header])) - signable = '\n'.join(signable_list).encode("ascii") + signable = '\n'.join(signable_list).encode('ascii') return signable def parse_authorization_header(header): if not isinstance(header, six.string_types): - header = header.decode("ascii") #HTTP headers cannot be Unicode. - - auth = header.split(" ", 1) - if len(auth) > 2: + header = header.decode('ascii') # HTTP headers cannot be Unicode. + + auth = header.split(' ', 1) + if len(auth) < 2: raise ValueError('Invalid authorization header. (eg. Method key1=value1,key2="value, \"2\"")') - + # Split up any args into a dictionary. values = {} - if len(auth) == 2: - auth_value = auth[1] - if auth_value and len(auth_value): - # This is tricky string magic. Let urllib do it. - fields = parse_http_list(auth_value) - - for item in fields: - # Only include keypairs. - if '=' in item: - # Split on the first '=' only. - key, value = item.split('=', 1) - if not (len(key) and len(value)): - continue - - # Unquote values, if quoted. - if value[0] == '"': - value = value[1:-1] - - values[key] = value - + auth_value = auth[1] + + # This is tricky string magic. Let urllib do it. + fields = parse_http_list(auth_value) + for item in fields: + # Only include keypairs. + if '=' in item: + # Split on the first '=' only. + key, value = item.split('=', 1) + if not (len(key) and len(value)): + continue + + # Unquote values, if quoted. + if value[0] == '"': + value = value[1:-1] + + values[key] = value + # ("Signature", {"headers": "date", "algorithm": "hmac-sha256", ... }) return (auth[0], CaseInsensitiveDict(values)) + def build_signature_template(key_id, algorithm, headers): """ Build the Signature template for use with the Authorization header. @@ -128,27 +131,12 @@ def build_signature_template(key_id, algorithm, headers): if headers: headers = [h.lower() for h in headers] param_map['headers'] = ' '.join(headers) - kv = map('{0[0]}="{0[1]}"'.format, param_map.items()) - kv_string = ','.join(kv) + kv_pairs = map('{0[0]}="{0[1]}"'.format, param_map.items()) + kv_string = ','.join(kv_pairs) sig_string = 'Signature {0}'.format(kv_string) return sig_string -def lkv(d): - parts = [] - while d: - len = struct.unpack('>I', d[:4])[0] - bits = d[4:len+4] - parts.append(bits) - d = d[len+4:] - return parts - -def sig(d): - return lkv(d)[1] - -def is_rsa(keyobj): - return lkv(keyobj.blob)[0] == "ssh-rsa" - # based on http://stackoverflow.com/a/2082169/151401 class CaseInsensitiveDict(dict): def __init__(self, d=None, **kwargs): @@ -165,6 +153,7 @@ def __getitem__(self, key): def __contains__(self, key): return super(CaseInsensitiveDict, self).__contains__(key.lower()) + # currently busted... def get_fingerprint(key): """ @@ -182,5 +171,4 @@ def get_fingerprint(key): key = key.strip().encode('ascii') key = base64.b64decode(key) fp_plain = hashlib.md5(key).hexdigest() - return ':'.join(a+b for a,b in zip(fp_plain[::2], fp_plain[1::2])) - + return ':'.join(a + b for a, b in zip(fp_plain[::2], fp_plain[1::2])) diff --git a/httpsig/verify.py b/httpsig/verify.py index a6e1ba3..a1931d6 100644 --- a/httpsig/verify.py +++ b/httpsig/verify.py @@ -1,15 +1,11 @@ """ Module to assist in verifying a signed header. """ -import six - -from Crypto.Hash import HMAC -from Crypto.PublicKey import RSA -from Crypto.Signature import PKCS1_v1_5 from base64 import b64decode +import six from .sign import Signer -from .utils import * +from .utils import HttpSigException, ct_bytes_compare, parse_authorization_header, CaseInsensitiveDict, generate_message class Verifier(Signer): @@ -24,20 +20,23 @@ def _verify(self, data, signature): `data` is the message to verify `signature` is a base64-encoded signature to verify against `data` """ - - if isinstance(data, six.string_types): data = data.encode("ascii") - if isinstance(signature, six.string_types): signature = signature.encode("ascii") - + + if isinstance(data, six.string_types): + data = data.encode('ascii') + + if isinstance(signature, six.string_types): + signature = signature.encode('ascii') + if self.sign_algorithm == 'rsa': - h = self._hash.new() - h.update(data) - return self._rsa.verify(h, b64decode(signature)) - + hash_ = self._hash.new() + hash_.update(data) + return self._rsa.verify(hash_, b64decode(signature)) + elif self.sign_algorithm == 'hmac': - h = self._sign_hmac(data) - s = b64decode(signature) - return ct_bytes_compare(h, s) - + signed_hmac = self._sign(data) + decoded_sig = b64decode(signature) + return ct_bytes_compare(signed_hmac, decoded_sig) + else: raise HttpSigException("Unsupported algorithm.") @@ -49,7 +48,7 @@ class HeaderVerifier(Verifier): def __init__(self, headers, secret, required_headers=None, method=None, path=None, host=None): """ Instantiate a HeaderVerifier object. - + :param headers: A dictionary of headers from the HTTP request. :param secret: The HMAC secret or RSA *public* key. :param required_headers: Optional. A list of headers required to be present to validate, even if the signature is otherwise valid. Defaults to ['date']. @@ -58,33 +57,28 @@ def __init__(self, headers, secret, required_headers=None, method=None, path=Non :param host: Optional. The value to use for the Host header, if not supplied in :param:headers. """ required_headers = required_headers or ['date'] - - auth = parse_authorization_header(headers['authorization']) - if len(auth) == 2: - self.auth_dict = auth[1] - else: - raise HttpSigException("Invalid authorization header.") - + + self.auth_dict = parse_authorization_header(headers['authorization'])[1] self.headers = CaseInsensitiveDict(headers) self.required_headers = [s.lower() for s in required_headers] self.method = method self.path = path self.host = host - + super(HeaderVerifier, self).__init__(secret, algorithm=self.auth_dict['algorithm']) def verify(self): """ Verify the headers based on the arguments passed at creation and current properties. - + Raises an Exception if a required header (:param:required_headers) is not found in the signature. Returns True or False. """ auth_headers = self.auth_dict.get('headers', 'date').split(' ') - - if len(set(self.required_headers) - set(auth_headers)) > 0: - raise Exception('{} is a required header(s)'.format(', '.join(set(self.required_headers)-set(auth_headers)))) - + + if set(self.required_headers) - set(auth_headers): + raise Exception('{} is a required header(s)'.format(', '.join(set(self.required_headers) - set(auth_headers)))) + signing_str = generate_message(auth_headers, self.headers, self.host, self.method, self.path) - + return self._verify(signing_str, self.auth_dict['signature']) diff --git a/setup.cfg b/setup.cfg index 3cbd55d..c1b9862 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,5 @@ [bdist_wheel] -universal = True \ No newline at end of file +universal = True + +[nosetests] +nocapture = 1 diff --git a/setup.py b/setup.py index 82cf121..e895820 100755 --- a/setup.py +++ b/setup.py @@ -28,9 +28,10 @@ "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 2.7", - "Programming Language :: Python :: 3.2", "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", "Topic :: Internet :: WWW/HTTP", "Topic :: Software Development :: Libraries :: Python Modules", ], @@ -43,5 +44,6 @@ include_package_data=True, zip_safe=True, install_requires=['pycrypto', 'six'], - test_suite="httpsig.tests", + test_suite='nose.collector', + tests_require=['nose', 'requests'] ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/httpsig/tests/rsa_private.pem b/tests/rsa_private.pem similarity index 100% rename from httpsig/tests/rsa_private.pem rename to tests/rsa_private.pem diff --git a/httpsig/tests/rsa_public.pem b/tests/rsa_public.pem similarity index 100% rename from httpsig/tests/rsa_public.pem rename to tests/rsa_public.pem diff --git a/tests/test_request_auth.py b/tests/test_request_auth.py new file mode 100644 index 0000000..6b42bea --- /dev/null +++ b/tests/test_request_auth.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +import os +import unittest +import requests + +from httpsig.requests_auth import HTTPSignatureAuth + + +class TestHttpSignatureAuth(unittest.TestCase): + + def setUp(self): + private_key_path = os.path.join(os.path.dirname(__file__), 'rsa_private.pem') + with open(private_key_path, 'rb') as f: + private_key = f.read() + + public_key_path = os.path.join(os.path.dirname(__file__), 'rsa_public.pem') + with open(public_key_path, 'rb') as f: + public_key = f.read() + + self.keyId = "Test" + self.algorithm = "rsa-sha256" + self.sign_secret = private_key + self.verify_secret = public_key + + def test__call_(self): + auth = HTTPSignatureAuth( + key_id=self.keyId, + secret=self.sign_secret, + algorithm=self.algorithm, + httpsig_version='draft-07', + headers=[ + 'date' + ]) + + request = requests.Request( + method='post', + url='http://example.com/foo?param=value&pet=dog', + auth=auth, + headers={ + 'host': 'example.com', + 'date': 'Thu, 05 Jan 2012 21:31:40 GMT', + 'content-type': 'application/json', + 'digest': 'SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=', + 'content-length': '18' + }).prepare() + + assert 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=' in request.headers['authorization'] diff --git a/tests/test_sign.py b/tests/test_sign.py new file mode 100644 index 0000000..99b7094 --- /dev/null +++ b/tests/test_sign.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python +import os +import unittest +import six + +import httpsig.sign as sign +from httpsig.utils import parse_authorization_header, HttpSigException + + +class TestSign(unittest.TestCase): + + def setUp(self): + self.key_path = os.path.join(os.path.dirname(__file__), 'rsa_private.pem') + with open(self.key_path, 'rb') as f: + self.key = f.read() + + def test_default(self): + hs = sign.HeaderSigner(key_id='Test', secret=self.key, algorithm='rsa-sha256') + unsigned = { + 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT' + } + signed = hs.sign(unsigned) + self.assertIn('Date', signed) + self.assertEqual(unsigned['Date'], signed['Date']) + self.assertIn('Authorization', signed) + auth = parse_authorization_header(signed['authorization']) + params = auth[1] + self.assertIn('keyId', params) + self.assertIn('algorithm', params) + self.assertIn('signature', params) + self.assertEqual(params['keyId'], 'Test') + self.assertEqual(params['algorithm'], 'rsa-sha256') + self.assertEqual(params['signature'], 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=') + + def test_all(self): + hs = sign.HeaderSigner(key_id='Test', secret=self.key, algorithm='rsa-sha256', headers=[ + '(request-target)', + 'host', + 'date', + 'content-type', + 'content-md5', + 'content-length' + ]) + unsigned = { + 'Host': 'example.com', + 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT', + 'Content-Type': 'application/json', + 'Content-MD5': 'Sd/dVLAcvNLSq16eXua5uQ==', + 'Content-Length': '18', + } + signed = hs.sign(unsigned, method='POST', path='/foo?param=value&pet=dog') + + self.assertIn('Date', signed) + self.assertEqual(unsigned['Date'], signed['Date']) + self.assertIn('Authorization', signed) + auth = parse_authorization_header(signed['authorization']) + params = auth[1] + self.assertIn('keyId', params) + self.assertIn('algorithm', params) + self.assertIn('signature', params) + self.assertEqual(params['keyId'], 'Test') + self.assertEqual(params['algorithm'], 'rsa-sha256') + self.assertEqual(params['headers'], '(request-target) host date content-type content-md5 content-length') + self.assertEqual(params['signature'], 'G8/Uh6BBDaqldRi3VfFfklHSFoq8CMt5NUZiepq0q66e+fS3Up3BmXn0NbUnr3L1WgAAZGplifRAJqp2LgeZ5gXNk6UX9zV3hw5BERLWscWXlwX/dvHQES27lGRCvyFv3djHP6Plfd5mhPWRkmjnvqeOOSS0lZJYFYHJz994s6w=') + + def test_PASS_verify_headers_by_draft_version(self): + testcases = { + 'draft-00': 'request-line', + 'draft-01': 'request-line', + 'draft-02': '(request-line)', + 'draft-03': '(request-target)', + 'draft-04': '(request-target)', + 'draft-05': '(request-target)', + 'draft-06': '(request-target)', + 'draft-07': '(request-target)', + None: '(request-target)' + } + for httpsig_version, header_req in testcases.items(): + hs = sign.HeaderSigner(key_id='Test', secret=self.key, httpsig_version=httpsig_version, headers=[ + header_req, + 'host', + 'date', + 'content-type', + 'content-md5', + 'content-length' + ]) + + def test_FAIL_verify_headers_by_draft_version(self): + testcases = [ + ('draft-00', '(request-line)'), + ('draft-00', '(request-target)'), + ('draft-01', '(request-line)'), + ('draft-01', '(request-target)'), + ('draft-02', 'request-line'), + ('draft-02', '(request-target)'), + ('draft-03', 'request-line'), + ('draft-03', '(request-line)'), + ('draft-04', 'request-line'), + ('draft-04', '(request-line)'), + ('draft-05', 'request-line'), + ('draft-05', '(request-line)'), + ('draft-06', 'request-line'), + ('draft-06', '(request-line)'), + ('draft-07', 'request-line'), + ('draft-07', '(request-line)'), + (None, 'request-line'), + (None, '(request-line)'), + ] + for httpsig_version, header_req in testcases: + try: + sign.HeaderSigner(key_id='Test', secret=self.key, httpsig_version=httpsig_version, headers=[ + header_req, + 'host', + 'date', + 'content-type', + 'content-md5', + 'content-length' + ]) + self.fail('Should raise KeyError in (%s, %s)' % (httpsig_version, header_req)) + except KeyError: + pass + + def test_signer_init(self): + + signature = 'ksTqAWZHgRktH3uKbXydPNEMqpjqALFiJVJXeCKj8zA=' + secret = 'i am a secret' + data = 'i am some data' + algorithm = 'hmac-sha256' + + # test good case: default string type + signer = sign.Signer(secret, algorithm=algorithm) + self.assertEqual(signer.sign(data), signature) + self.assertEqual(signer.algorithm, algorithm) + + # test good case: unicode/byte string type for secret + signer = sign.Signer(secret.encode() if six.PY3 else secret.decode(), algorithm=algorithm) + self.assertEqual(signer.sign(data), signature) + + # test good case: unicode/byte string type for data + signer = sign.Signer(secret, algorithm=algorithm) + self.assertEqual(signer.sign(data.encode() if six.PY3 else data.decode()), signature) + + # test invalid hash algorithm + weird_algorithm = 'rsa-sha257' + original_algorithms = sign.ALGORITHMS + sign.ALGORITHMS = frozenset([weird_algorithm]) + try: + signer = sign.Signer(secret, algorithm=weird_algorithm) + self.fail('No exception was raised') + except Exception as ex: + self.assertIsInstance(ex, HttpSigException) + finally: + sign.ALGORITHMS = original_algorithms + + # test invalid sign algorithm + weird_algorithm = 'xxx-sha256' + original_algorithms = sign.ALGORITHMS + sign.ALGORITHMS = frozenset([weird_algorithm]) + try: + signer = sign.Signer(secret, algorithm=weird_algorithm) + signer.sign('nothing') + self.fail('No exception was raised') + except Exception as ex: + self.assertIsInstance(ex, SystemError) + finally: + sign.ALGORITHMS = original_algorithms diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..e2e50cd --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os +import unittest +import six +import httpsig.utils + + +class TestUtils(unittest.TestCase): + + def test_ct_bytes_compare(self): + if six.PY3: + test_cases = [ + (b"\xa5Q\x7f\x92q\x8b+\xd5\x83x'^", b"\xa5Q\x7f\x92q\x8b+\xd5\x83x'^", True), + ('123', '123', True), + (b'123', '123', True), + (b'123', b'123', True), + (b'\xe5\xb0\x8d', '對', True), + (b'123456', b'123', False) + ] + elif six.PY2: + test_cases = [ + ("\xa5Q\x7f\x92q\x8b+\xd5\x83x'^", "\xa5Q\x7f\x92q\x8b+\xd5\x83x'^", True), + ('123', '123', True), + ('123', u'123', True), + (u'123', u'123', True), + ('\xe5\xb0\x8d', u'對', True), + ('123456', '123', False) + ] + + for case in test_cases: + self.assertIs(httpsig.utils.ct_bytes_compare(case[0], case[1]), case[2]) + + def test_generate_message(self): + HOST = "example.org" + METHOD = "POST" + PATH = '/foo' + HTTP_VERSION = 'HTTP/1.1' + HEADERS = { + 'Host': 'example.org', + 'Date': 'Tue, 07 Jun 2014 20:51:35 GMT', + 'Content-Type': 'application/json', + 'Digest': 'SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=', + 'Content-Length': 18 + } + good_test_cases = { + # from draft-01: 3.1.2. RSA Example + 'request-line host date digest content-length': b'POST /foo HTTP/1.1\nhost: example.org\ndate: Tue, 07 Jun 2014 20:51:35 GMT\ndigest: SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=\ncontent-length: 18', + # draft-02 + '(request-line) host date digest content-length': b'(request-line): post /foo\nhost: example.org\ndate: Tue, 07 Jun 2014 20:51:35 GMT\ndigest: SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=\ncontent-length: 18', + # draft-03 + '(request-target) host date digest content-length': b'(request-target): post /foo\nhost: example.org\ndate: Tue, 07 Jun 2014 20:51:35 GMT\ndigest: SHA-256=X48E9qOokqqrvdts8nOJRJN3OWDUoyWxBf7kbu9DBPE=\ncontent-length: 18' + } + + for required_headers, result in good_test_cases.items(): + assert httpsig.utils.generate_message( + required_headers=required_headers.split(), + headers=HEADERS, + host=HOST, + method=METHOD, + path=PATH, + http_version=HTTP_VERSION) == result, 'header: %s\nexpect: %s\n' % (required_headers, result) + + exception_test_cases = [ + {'required_headers': ['(request-target)'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '', 'http_version': ''}, + {'required_headers': ['(request-target)'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': 'GET', 'path': '', 'http_version': ''}, + {'required_headers': ['(request-target)'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '/', 'http_version': ''}, + {'required_headers': ['(request-line)'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': ''}, + {'required_headers': ['(request-line)'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': 'GET', 'path': '', 'http_version': ''}, + {'required_headers': ['(request-line)'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '/', 'http_version': ''}, + {'required_headers': ['request-line'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '', 'http_version': ''}, + {'required_headers': ['request-line'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': 'GET', 'path': '', 'http_version': ''}, + {'required_headers': ['request-line'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '/', 'http_version': ''}, + {'required_headers': ['request-line'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '', 'http_version': 'HTTP/1.1'}, + {'required_headers': ['request-line'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': 'GET', 'path': '', 'http_version': 'HTTP/1.1'}, + {'required_headers': ['request-line'], 'host': HOST, 'headers': HEADERS, 'exception': KeyError, 'method': '', 'path': '/', 'http_version': 'HTTP/1.1'}, + {'required_headers': [], 'host': HOST, 'headers': [], 'exception': KeyError, 'method': '', 'path': '', 'http_version': ''}, + {'required_headers': ['haha'], 'host': HOST, 'headers': [], 'exception': KeyError, 'method': '', 'path': '', 'http_version': ''}, + {'required_headers': ['host'], 'host': '', 'headers': [], 'exception': KeyError, 'method': '', 'path': '', 'http_version': ''} + ] + for case in exception_test_cases: + try: + httpsig.utils.generate_message( + required_headers=case['required_headers'], + headers=case['headers'], + host=case['host'], + method=case['method'], + path=case['path'], + http_version=case['http_version']) + self.fail('No exception was raised') + except Exception as ex: + self.assertIsInstance(ex, case['exception'], str(case)) + + def test_parse_authorization_header(self): + if six.PY3: + good_test_cases = [ + ( + 'Signature keyId="Test",algorithm="rsa-sha512",headers="(request-target) host date content-type content-md5 content-length"', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'algorithm': 'rsa-sha512', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ), + ( + b'Signature keyId="Test",algorithm="rsa-sha512",headers="(request-target) host date content-type content-md5 content-length"', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'algorithm': 'rsa-sha512', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ), + ( + 'Signature keyId="Test",algorithm=,headers="(request-target) host date content-type content-md5 content-length",=123', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ), + ( + 'Signature keyId="Test", algorithm="rsa-sha512" ,headers="(request-target) host date content-type content-md5 content-length"', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'algorithm': 'rsa-sha512', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ) + ] + elif six.PY2: + good_test_cases = [ + ( + 'Signature keyId="Test",algorithm="rsa-sha512",headers="(request-target) host date content-type content-md5 content-length"', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'algorithm': 'rsa-sha512', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ), + ( + u'Signature keyId="Test",algorithm="rsa-sha512",headers="(request-target) host date content-type content-md5 content-length"', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'algorithm': 'rsa-sha512', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ), + ( + 'Signature keyId="Test",algorithm=,headers="(request-target) host date content-type content-md5 content-length",=123', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ), + ( + 'Signature keyId="Test", algorithm="rsa-sha512" ,headers="(request-target) host date content-type content-md5 content-length"', + ('Signature', httpsig.utils.CaseInsensitiveDict({ + 'keyId': 'Test', + 'algorithm': 'rsa-sha512', + 'headers': '(request-target) host date content-type content-md5 content-length'})) + ) + ] + + for case in good_test_cases: + self.assertEqual(httpsig.utils.parse_authorization_header(case[0]), case[1]) + + exception_test_cases = [ + ('Signature-keyId="Test",algorithm="rsa-sha512"', ValueError), + ('', ValueError), + (123, AttributeError) + ] + for case in exception_test_cases: + try: + httpsig.utils.parse_authorization_header(case[0]) + self.fail('No exception was raised.') + except Exception as ex: + self.assertIsInstance(ex, case[1]) + + def test_get_fingerprint(self): + with open(os.path.join(os.path.dirname(__file__), 'rsa_public.pem'), 'r') as k: + key = k.read() + fingerprint = httpsig.utils.get_fingerprint(key) + self.assertEqual(fingerprint, "73:61:a2:21:67:e0:df:be:7e:4b:93:1e:15:98:a5:b7") + + test_case = [('ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDFuqGVUQjw8KtwY+JIZx4uGhy2ap4QLjtUOaH/o8vxUeAk7P5Olhxzr2FBnCUjS6iZmuzZzviXI3NhyR2ic661hFlXxkJaEa6DruRakZ6P+uMFPmvE+RsOp0ppcW2uGO5Y8C0OqEMI4NT2E4/LIzM7kmspF7cvJajUa9UQ6ZpKG/YfZpOs6xug8uT+1GCiPC+w/GX2UWtj2kmTUUJZWddSev9kHUDbPl6GwLMmnJ3UB9C7rdNlhupArJAsL+7eAXR9DcV5Fo7kDtFYiZllwRAWVghVjeGyEkSOEEbVCtI+l+V2cFYlta1mPXJQsshKvzYQ25IjnDBnXGg/HpwCppWT httpsig', '99:18:ae:30:be:c4:12:ce:b5:17:2b:56:ee:a9:ab:23')] + + for case in test_case: + self.assertEqual(httpsig.utils.get_fingerprint(case[0]), case[1]) diff --git a/httpsig/tests/test_verify.py b/tests/test_verify.py old mode 100755 new mode 100644 similarity index 79% rename from httpsig/tests/test_verify.py rename to tests/test_verify.py index f49eeb3..ffab48d --- a/httpsig/tests/test_verify.py +++ b/tests/test_verify.py @@ -1,13 +1,14 @@ #!/usr/bin/env python -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) -import json +import os +import six import unittest from httpsig.sign import HeaderSigner, Signer from httpsig.verify import HeaderVerifier, Verifier +from httpsig.utils import HttpSigException +import httpsig.utils + class BaseTestCase(unittest.TestCase): def _parse_auth(self, auth): @@ -23,33 +24,39 @@ def _parse_auth(self, auth): param_dict = {k: v.strip('"') for k, v in param_pairs} return param_dict - + class TestVerifyHMACSHA1(BaseTestCase): def setUp(self): secret = b"something special goes here" - + self.keyId = "Test" self.algorithm = "hmac-sha1" self.sign_secret = secret self.verify_secret = secret - + def test_basic_sign(self): signer = Signer(secret=self.sign_secret, algorithm=self.algorithm) verifier = Verifier(secret=self.verify_secret, algorithm=self.algorithm) - GOOD = b"this is a test" - BAD = b"this is not the signature you were looking for..." - + good_case = b"this is a test" + # BAD = b"this is not the signature you were looking for..." + + if six.PY3: + bad_test_cases = ['this is not the signature you were looking for...', b'this is not the signature you were looking for...'] + if six.PY2: + bad_test_cases = [u'this is not the signature you were looking for...', 'this is not the signature you were looking for...'] + # generate signed string - signature = signer._sign(GOOD) - self.assertTrue(verifier._verify(data=GOOD, signature=signature)) - self.assertFalse(verifier._verify(data=BAD, signature=signature)) + signature = signer.sign(good_case) + self.assertTrue(verifier._verify(data=good_case, signature=signature)) + for case in bad_test_cases: + self.assertFalse(verifier._verify(data=case, signature=signature)) def test_default(self): unsigned = { 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT' } - + hs = HeaderSigner(key_id="Test", secret=self.sign_secret, algorithm=self.algorithm) signed = hs.sign(unsigned) hv = HeaderVerifier(headers=signed, secret=self.verify_secret) @@ -75,7 +82,7 @@ def test_signed_headers(self): 'Content-Length': '18', } signed = hs.sign(unsigned, method=METHOD, path=PATH) - + hv = HeaderVerifier(headers=signed, secret=self.verify_secret, host=HOST, method=METHOD, path=PATH) self.assertTrue(hv.verify()) @@ -129,12 +136,29 @@ def test_extra_auth_headers(self): hv = HeaderVerifier(headers=signed, secret=self.verify_secret, method=METHOD, path=PATH, required_headers=['date', '(request-target)']) self.assertTrue(hv.verify()) + def test_unsupported_algorithm(self): + unsigned = { + 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT' + } + weird_algorithm = 'xxx-sha257' + + try: + original_algorithms = httpsig.sign.ALGORITHMS + httpsig.sign.ALGORITHMS = frozenset([weird_algorithm]) + Verifier(secret=self.verify_secret, algorithm=weird_algorithm)._verify(data='123', signature='456') + self.fail('No exception was raised for false algorithm') + except Exception as ex: + self.assertIsInstance(ex, HttpSigException) + finally: + httpsig.sign.ALGORITHMS = original_algorithms + class TestVerifyHMACSHA256(TestVerifyHMACSHA1): def setUp(self): super(TestVerifyHMACSHA256, self).setUp() self.algorithm = "hmac-sha256" + class TestVerifyHMACSHA512(TestVerifyHMACSHA1): def setUp(self): super(TestVerifyHMACSHA512, self).setUp() @@ -146,21 +170,23 @@ def setUp(self): private_key_path = os.path.join(os.path.dirname(__file__), 'rsa_private.pem') with open(private_key_path, 'rb') as f: private_key = f.read() - + public_key_path = os.path.join(os.path.dirname(__file__), 'rsa_public.pem') with open(public_key_path, 'rb') as f: public_key = f.read() - + self.keyId = "Test" self.algorithm = "rsa-sha1" self.sign_secret = private_key self.verify_secret = public_key + class TestVerifyRSASHA256(TestVerifyRSASHA1): def setUp(self): super(TestVerifyRSASHA256, self).setUp() self.algorithm = "rsa-sha256" + class TestVerifyRSASHA512(TestVerifyRSASHA1): def setUp(self): super(TestVerifyRSASHA512, self).setUp() diff --git a/tox.ini b/tox.ini index 5add957..2d34ea2 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,10 @@ [tox] -envlist = py27, py32, py33, py34 +envlist = py27, py33, py34, py35, py36 [testenv] -commands = python setup.py test +passenv = TRAVIS TRAVIS_JOB_ID TRAVIS_BRANCH +deps = + coverage +commands = + coverage run --source=httpsig setup.py test + coverage report -m