diff --git a/pymisp/tools/__init__.py b/pymisp/tools/__init__.py index 45907b39..5e41f27b 100644 --- a/pymisp/tools/__init__.py +++ b/pymisp/tools/__init__.py @@ -13,6 +13,9 @@ from .asnobject import ASNObject # noqa from .geolocationobject import GeolocationObject # noqa from .git_vuln_finder_object import GitVulnFinderObject # noqa +from .attributevalidationtool import ( #noqa + AttributeValidationTool, validate_attribute, validate_attributes, + validate_event, validate_object, validate_objects, ValidationError) from .vehicleobject import VehicleObject # noqa from .csvloader import CSVLoader # noqa @@ -51,5 +54,8 @@ 'GitVulnFinderObject', 'VehicleObject', 'CSVLoader', 'SSHAuthorizedKeysObject', 'feed_meta_generator', 'update_objects', 'EMailObject', 'URLObject', 'PEObject', 'PESectionObject', 'ELFObject', - 'ELFSectionObject', 'MachOObject', 'MachOSectionObject' + 'ELFSectionObject', 'MachOObject', 'MachOSectionObject', + 'AttributeValidationTool', 'validate_attribute', 'validate_attributes', + 'validate_event', 'validate_object', 'validate_objects', + 'ValidationError' ] diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py new file mode 100644 index 00000000..6c00ac60 --- /dev/null +++ b/pymisp/tools/attributevalidationtool.py @@ -0,0 +1,849 @@ +#!/usr/bin/env python3 + +import ipaddress +import json +import logging +import re +from base64 import b64decode +from datetime import datetime +from dateutil.parser import parse +from pymisp import MISPAttribute, MISPEvent, MISPObject +from pymisp.exceptions import PyMISPError +from typing import Generator +from urllib.parse import urlparse + +HASH_HEX_LENGTH = { + 'authentihash': 64, + 'md5': 32, + 'imphash': 32, + 'telfhash': 70, + 'sha1': 40, + 'git-commit-id': 40, + 'x509-fingerprint-md5': 32, + 'x509-fingerprint-sha1': 40, + 'x509-fingerprint-sha256': 64, + 'ja3-fingerprint-md5': 32, + 'jarm-fingerprint': 62, + 'hassh-md5': 32, + 'hasshserver-md5': 32, + 'pehash': 40, + 'sha224': 56, + 'sha256': 64, + 'sha384': 96, + 'sha512': 128, + 'sha512/224': 56, + 'sha512/256': 64, + 'sha3-224': 56, + 'sha3-256': 64, + 'sha3-384': 96, + 'sha3-512': 128, + 'dom-hash': 32, +} +HTTP_METHODS = ( + 'OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT', + 'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK', + 'VERSION-CONTROL', 'REPORT', 'CHECKOUT', 'CHECKIN', 'UNCHECKOUT', + 'MKWORKSPACE', 'UPDATE', 'LABEL', 'MERGE', 'BASELINE-CONTROL', + 'MKACTIVITY', 'ORDERPATCH', 'ACL', 'PATCH', 'SEARCH' +) +REFANG_REGEX_TABLE = ( + { + 'from': re.compile(r'^(hxxp|hxtp|htxp|meow|h\[tt\]p)', re.IGNORECASE), + 'to': 'http', + 'types': ('link', 'url') + }, + { + 'from': re.compile(r'(\[\.\]|\[dot\]|\(dot\))', re.IGNORECASE), + 'to': '.', + 'types': ( + 'link', 'url', 'ip-dst', 'ip-src', 'domain|ip', 'domain', + 'hostname', 'email', 'email-src', 'email-dst' + ) + }, + { + 'from': re.compile(r'\[hxxp:\/\/\]', re.IGNORECASE), + 'to': 'http', + 'types': ('link', 'url') + }, + { + 'from': re.compile(r'\[\@\]|\[at\]', re.IGNORECASE), + 'to': '@', + 'types': ('email', 'email-src', 'email-dst') + }, + { + 'from': re.compile(r'\[:\]'), + 'to': ':', + 'types': ('link', 'url') + } +) +VULNERABILITY_REGEXES = ( + r'CVE-\d{4}-\d{4,}', + r'GCVE-\d+-\d{4}-\d+', + r'fkie_cve-\d{4}-\d{4,}', + r'ghsa-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}', + r'pysec-\d{4}-\d{2,5}', + r'gsd-\d{4}-\d{4,5}', + r'mal-\d{4}-\d+', + r'wid-sec-w-\d{4}-\d{4}', + r'ncsc-\d{4}-\d{4}', + r'ssa-\d{6}', + r'rh(ba|ea|sa)-\d{4}:\d{4,}', + r'ics(ma|a)-\d{2}-\d{3}-\d{2}', + r'va-\d{2}-\d{3}-\d{2}', + r'cisco-sa(-[a-zA-Z0-9_]+)+', + r'sca-\d{4}-\d{4,}', + r'nn-\d{4}[:_]\d-\d{2}', + r'oxas-adv-\d{4}-\d{4}', + r'msrc_cve-\d{4}-\d{4,}', + r'var-\d{6}-\d{4}', + r'jvndb-\d{4}-\d{6}', + r'ts-\d{4}-\d{4}', + r'(open)?suse-su-\d{4}:\d{4,}-\d', + r'cnvd-\d{4}-\d{5}', + r'certfr-\d{4}-avi-\d{4}', + r'certfr-\d{4}-ale-\d{3}' +) + +CDHASH_RE = re.compile(r'^[0-9a-f]{40,}$') +EMAIL_RE = re.compile(r'^.[^\s]*\@.*\..*$', flags=re.IGNORECASE) +DOMAIN_RE = re.compile(r'^[A-Z0-9.\-_]+\.[A-Z0-9\-]{2,}$', flags=re.IGNORECASE) +HEX_RE = re.compile(r'^[0-9a-fA-F]+$') +MAC_ADDRESS_RE = re.compile(r'^([a-f0-9]{2}:){5}[a-f0-9]{2}$') +MAC_EUI_64_RE = re.compile(r'^([a-f0-9]{2}:){3}ff:fe:(:[a-f0-9]{2}){3}$') +ONION_RE = re.compile(r'^([a-z2-7]{16}|[a-z2-7]{56})\.onion$') +REMOVE_NON_ALPHANUM_CAP_RE = re.compile(r'[^0-9A-Z]+') +REMOVE_NON_ALPHANUM_RE = re.compile(r'[^0-9A-Fa-f]') +REMOVE_NON_NUM_RE = re.compile(r'[^0-9]+') +REMOVE_PHONE_PARENTHESIS_RE = re.compile(r'\(0\)') +SANITISE_PHONE_NUMBER_RE = re.compile(r'[^\+0-9]+') +SSDEEP_RE = re.compile(r'^([0-9]+):([0-9a-zA-Z/+]*):([0-9a-zA-Z/+]*)$') +UUID_RE = re.compile(r'[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$') +VULNERABILITY_RE = re.compile( + r'^(?:' + '|'.join(VULNERABILITY_REGEXES) + r')$', flags=re.IGNORECASE +) +WEAKNESS_RE = re.compile(r"^CWE-[0-9]+$", flags=re.IGNORECASE) + +logger = logging.getLogger('pymisp') + + +class ValidationError(PyMISPError): + pass + + +class AttributeValidationTool: + @classmethod + def modifyBeforeValidation(cls, attribute_type, value): + if isinstance(value, str): + value = cls._refang_value(attribute_type, value.strip()) + match attribute_type: + case ('ip-src' | 'ip-dst'): + return cls._normalise_ip(value) + case ('md5' | 'sha1' | 'sha224' | 'sha256' | 'sha384' | 'sha512' | + 'sha512/224' | 'sha512/256' | 'sha3-224' | 'sha3-256' | + 'sha3-384' | 'sha3-512' | 'ja3-fingerprint-md5' | + 'jarm-fingerprint' | 'hassh-md5' | 'hasshserver-md5' | + 'hostname' | 'pehash' | 'authentihash' | 'vhash' | 'imphash' | + 'telfhash' | 'tlsh' | 'anonymised' | 'cdhash' | 'email' | + 'email-src' | 'email-dst' | 'target-email' | + 'whois-registrant-email' | 'dom-hash' | 'onion-address'): + return value.lower() + case 'domain': + value = value.lower().strip('.') + # Domain is not valid, try to convert to punycode + if not cls._is_domain_valid(value): + return value.encode('idna').decode('ascii') + return value + case 'domain|ip': + parts = value.lower().split('|') + if len(parts) != 2: + return value # not a composite + domain, ip = parts + domain = domain.strip('.') + # Domain is not valid, try to convert to punycode + if not cls._is_domain_valid(domain): + domain = domain.encode('idna').decode('ascii') + return f'{domain}|{cls._normalise_ip(ip)}' + case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | + 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | + 'filename|sha512' | 'filename|sha512/224' | + 'filename|sha512/256' | 'filename|sha3-224' | + 'filename|sha3-256' | 'filename|sha3-384' | + 'filename|sha3-512' | 'filename|authentihash' | + 'filename|vhash' | 'filename|pehash' | 'filename|tlsh'): + # Convert hash to lowercase + composite = value.split('|') + if len(composite) != 2: + return value # not a composite + filename, _hash = composite + return f'{filename}|{_hash.lower()}' + case 'http-method' | 'hex': + return value.upper() + case 'vulnerability': + value = value.replace('–', '-') + source = value.split('-')[0] + if source in ('cve', 'gcve'): + return value.upper() + return value + case 'weakness': + return value.replace('–', '-').upper() + case 'cc-number' | 'bin': + return re.sub(REMOVE_NON_NUM_RE, '', value) + case 'iban' | 'bic': + return re.sub(REMOVE_NON_ALPHANUM_CAP_RE, '', value.upper()) + case 'prtn' | 'whois-registrant-phone' | 'phone-number': + if value.startswith('00'): + value = f'+{value[2:]}' + value = re.sub(REMOVE_PHONE_PARENTHESIS_RE, '', value) + return re.sub(SANITISE_PHONE_NUMBER_RE, '', value) + case 'x509-fingerprint-md5' | 'x509-fingerprint-sha256' | 'x509-fingerprint-sha1': + return value.replace(':', '').lower() + case 'ip-dst|port' | 'ip-src|port': + if value.count(':') >= 2: # (ipv6|port) - tokenize ip and port + if '|' in value: # 2001:db8::1|80 + ip, port = value.split('|', 1) + return f'{cls._normalise_ip(ip)}|{port}' + if value.startswith('[') and ']' in value: # [2001:db8::1]:80 + ip, port = value[1:].split(']', 1) + return f'{cls._normalise_ip(ip)}|{port.lstrip(":")}' + for separator in ('.', ' port ', 'p', '#'): + if separator in value: + ip, port = value.split(separator, 1) + return f'{cls._normalise_ip(ip)}|{port}' + # 2001:db8::1:80 this one is ambiguous + *parts, port = value.split(':') + return f'{cls._normalise_ip(":".join(parts))}|{port}' + for separator in (':', '|'): + if separator in value: # ipv4:port or ipv4|port + ip, port = value.split(separator, 1) + return f'{cls._normalise_ip(ip)}|{port}' + return value + case 'mac-address' | 'mac-eui-64': + value = re.sub(REMOVE_NON_ALPHANUM_RE, '', value).lower() + return ':'.join(value[i:i+2] for i in range(0, 12, 2)) + case 'hostname|port': + return value.replace(':', '|').lower() + case 'boolean': + if isinstance(value, int): + return bool(value) + if isinstance(value, str): + value = value.lower() + if value == 'true': + return True + if value == 'false': + return False + return value + case 'datetime': + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError: + try: + return parse(value) + except Exception: + return value + return value + case 'AS': + if value.upper().startswith('AS'): + value = value[2:] # remove 'AS' + if '.' in value: # maybe value is in asdot notation + multiplier, remainder = value.split('.', 1) + if cls._is_positive_integer(multiplier) and cls._is_positive_integer(remainder): + return int(multiplier) * 65536 + int(remainder) + return value + case _: + return value + + @classmethod + def validate(cls, attribute_type, value): + match attribute_type: + case ('md5' | 'imphash' | 'sha1' | 'sha224' | 'sha256' | 'sha384' | + 'sha512' | 'sha512/224' | 'sha512/256' | 'sha3-224' | + 'sha3-256' | 'sha3-384' | 'sha3-512' | 'authentihash' | + 'ja3-fingerprint-md5' | 'jarm-fingerprint' | 'hassh-md5' | + 'hasshserver-md5' | 'x509-fingerprint-md5' | + 'x509-fingerprint-sha256' | 'x509-fingerprint-sha1' | + 'git-commit-id' | 'dom-hash'): + if cls._is_hash_valid(attribute_type, value): + return True + length = HASH_HEX_LENGTH[attribute_type] + return ( + 'Checksum has an invalid length or format (expected: ' + f'{length} hexadecimal characters). Please double check ' + 'the value or select type "other".' + ) + case 'tlsh': + if cls._is_tlsh_valid(value): + return True + return ( + 'Checksum has an invalid length or format (expected: at ' + 'least 35 hexadecimal characters, optionally starting ' + 'with t1 instead of hexadecimal characters). Please ' + 'double check the value or select type "other".' + ) + case 'telfhash': + if cls._is_telfhash_valid(value): + return True + return ( + 'Checksum has an invalid length or format (expected: ' + '70 or 72 hexadecimal characters). Please double check ' + 'the value or select type "other".' + ) + case 'pehash': + if cls._is_hash_valid('pehash', value): + return True + return ( + "The input doesn't match the expected sha1 format " + '(expected: 40 hexadecimal characters). Keep in mind that ' + 'MISP currently only supports SHA1 for PEhashes, if you ' + 'would like to get the support extended to other hash ' + 'types, make sure to create a github ticket about it at ' + 'https://github.com/MISP/MISP!' + ) + case 'ssdeep': + if cls._is_ssdeep(value): + return True + return 'Invalid SSDeep hash. The format has to be blocksize:hash:hash' + case 'impfuzzy': + if value.count(':') == 2: + imports, *_ = value.split(':') + if cls._is_positive_integer(imports): + return True + return 'Invalid impfuzzy format. The format has to be imports:hash:hash' + case 'cdhash': + if CDHASH_RE.fullmatch(value): + return True + return ( + "The input doesn't match the expected format " + '(expected: 40 or more hexadecimal characters)' + ) + case 'http-method': + if value in HTTP_METHODS: + return True + return 'Unknown HTTP method.' + case 'filename|pehash': + if re.fullmatch(r'^.+\|[0-9a-f]{40}$', value): + return True + return ( + "The input doesn't match the expected filename|sha1 format " + '(expected: filename|40 hexadecimal characters). Keep in ' + 'mind that MISP currently only supports SHA1 for PEhashes, ' + 'if you would like to get the support extended to other ' + 'hash types, make sure to create a github ticket about it ' + 'at https://github.com/MISP/MISP!' + ) + case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | + 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | + 'filename|sha512' | 'filename|sha512/224' | + 'filename|sha512/256' | 'filename|sha3-224' | + 'filename|sha3-256' | 'filename|sha3-384' | + 'filename|sha3-512' | 'filename|authentihash'): + length = HASH_HEX_LENGTH[attribute_type[9:]] # strip `filename|`] + if re.fullmatch(r'^.+\|[0-9a-f]{' + str(length) + r'}$', value): + return True + return ( + 'Checksum has an invalid length or format (expected:' + f'filename|{length} hexadecimal characters). Please' + 'double check the value or select type "other".' + ) + case 'filename|ssdeep': + composite = value.split('|') + if len(composite) == 2: + filename, ssdeep = composite + if '\n' in filename: + return 'Filename must not contain new line character.' + if cls._is_ssdeep(ssdeep): + return True + return 'Invalid ssdeep hash (expected: blocksize:hash:hash).' + case 'filename|tlsh': + composite = value.split('|') + if len(composite) == 2: + filename, tlsh = composite + if '\n' in filename: + return 'Filename must not contain new line character.' + if cls._is_tlsh_valid(tlsh): + return True + return ( + 'TLSH hash has an invalid length or format (expected: ' + 'filename|at least 35 hexadecimal characters, optionally ' + 'starting with t1 instead of hexadecimal characters). ' + 'Please double check the value or select type "other".' + ) + case 'filename|vhash': + if re.fullmatch(r'^.+\|.+$', value): + return True + return ( + 'Checksum has an invalid length or format (expected: ' + 'filename|string characters). Please double check the ' + 'value or select type "other".' + ) + case 'ip-src' | 'ip-dst': + return cls._validate_ip(value) + case 'port': + if cls._is_port_valid(value): + return True + return 'Port numbers have to be integers between 1 and 65535.' + case 'ip-dst|port' | 'ip-src|port': + composite = value.split('|') + if len(composite) != 2: + return 'Invalid ip-dst|port format.' + ip, port = composite + if not cls._is_port_valid(port): + return 'Port numbers have to be integers between 1 and 65535.' + return cls._validate_ip(ip) + case 'onion-address': + if ONION_RE.fullmatch(value): + return True + return 'Onion address has an invalid format.' + case 'mac-address': + if MAC_ADDRESS_RE.fullmatch(value): + return True + return 'MAC address has an invalid format.' + case 'mac-eui-64': + if MAC_EUI_64_RE.fullmatch(value): + return True + return 'MAC EUI-64 address has an invalid format.' + case 'hostname' | 'domain': + if cls._is_domain_valid(value): + return True + return ( + f'{attribute_type.capitalize()} has an invalid format. ' + 'Please double check the value or select type "other".' + ) + case 'hostname|port': + composite = value.split('|') + if len(composite) != 2: + return 'Invalid hostname|port format.' + hostname, port = composite + if not cls._is_domain_valid(hostname): + return 'Hostname has an invalid format.' + if not cls._is_port_valid(port): + return 'Port numbers have to be integers between 1 and 65535.' + return True + case 'domain|ip': + composite = value.split('|') + if len(composite) != 2: + return 'Invalid domain|ip format.' + domain, ip = composite + if not cls._is_domain_valid(domain): + return 'Domain has an invalid format.' + return cls._validate_ip(ip) + case ('email' | 'email-src' | 'eppn' | 'email-dst' | 'target-email' | + 'whois-registrant-email' | 'dns-soa-email' | 'jabber-id'): + # we don't use the native function to prevent issues with partial email addresses + if EMAIL_RE.fullmatch(value): + return True + return ( + 'Email address has an invalid format. Please double ' + 'check the value or select type "other".' + ) + case 'vulnerability': + if VULNERABILITY_RE.fullmatch(value): + return True + return 'Invalid vulnerability ID format.' + case 'weakness': + if WEAKNESS_RE.fullmatch(value): + return True + return 'Invalid format. Expected: CWE-x...' + case 'windows-service-name' | 'windows-service-displayname': + if len(value) > 256 or re.search(r'[\\/]', value): + return ( + 'Invalid format. Only values shorter than 256 characters ' + "that don't include any forward or backward slashes are allowed." + ) + return True + case ('mutex' | 'process-state' | 'snort' | 'bro' | 'zeek' | + 'community-id' | 'anonymised' | 'pattern-in-file' | + 'pattern-in-traffic' | 'pattern-in-memory' | 'filename-pattern' | + 'pgp-public-key' | 'pgp-private-key' | 'yara' | 'stix2-pattern' | + 'sigma' | 'gene' | 'kusto-query' | 'mime-type' | + 'identity-card-number' | 'cookie' | 'attachment' | + 'malware-sample' | 'comment' | 'text' | 'other' | 'cpe' | + 'email-attachment' | 'email-body' | 'email-header' | + 'first-name' | 'middle-name' | 'last-name' | 'full-name'): + return True + case 'link': + parsed = urlparse(value) + if all([parsed.scheme, parsed.netloc]): + return True + return 'Link has to be a valid URL.' + case 'hex': + if HEX_RE.fullmatch(value): + return True + return 'Value has to be a hexadecimal string.' + case ('target-user' | 'campaign-name' | 'campaign-id' | + 'threat-actor' | 'target-machine' | 'target-org' | + 'target-location' | 'target-external' | 'email-subject' | + 'malware-type' | 'url' | 'uri' | 'user-agent' | 'regkey' | + 'regkey|value' | 'filename' | 'pdb' | 'windows-scheduled-task' | + 'whois-registrant-name' | 'whois-registrant-org' | + 'whois-registrar' | 'whois-creation-date' | 'date-of-birth' | + 'place-of-birth' | 'gender' | 'passport-number' | + 'passport-country' | 'passport-expiration' | 'redress-number' | + 'nationality' | 'visa-number' | 'issue-date-of-the-visa' | + 'primary-residence' | 'country-of-residence' | + 'special-service-request' | 'frequent-flyer-number' | + 'travel-details' | 'payment-details' | + 'place-port-of-original-embarkation' | 'place-port-of-clearance' | + 'place-port-of-onward-foreign-destination' | + 'passenger-name-record-locator-number' | + 'email-dst-display-name' | 'email-src-display-name' | + 'email-reply-to' | 'email-x-mailer' | 'email-mime-boundary' | + 'email-thread-index' | 'email-message-id' | 'github-username' | + 'github-repository' | 'github-organisation' | 'twitter-id' | + 'dkim' | 'dkim-signature' | 'favicon-mmh3' | + 'chrome-extension-id' | 'mobile-application-id' | + 'azure-application-id' | 'named pipe'): + if '\n' in value: + return 'Value must not contain new line character.' + return True + case 'ssh-fingerprint': + if cls._is_ssh_fingerprint(value): + return True + return 'SSH fingerprint must be in MD5 or SHA256 format.' + case 'datetime': + if isinstance(value, datetime): + return True + try: + parse(value) + return True + except Exception: + return 'Datetime has to be in the ISO 8601 format.' + case 'size-in-bytes' | 'counter': + if cls._is_positive_integer(value): + return True + return 'The value has to be a whole number greater or equal 0.' + # case 'targeted-threat-index': + # if (!is_numeric($value) || $value < 0 || $value > 10) { + # return __('The value has to be a number between 0 and 10.'); + # } + # return True + case 'integer': + try: + int(value) + return True + except ValueError: + return 'The value has to be an integer value.' + case 'iban' | 'bic' | 'btc' | 'dash' | 'xmr': + if value.isalnum(): + return True + return f'{attribute_type.upper()} has to be alphanumeric.' + case 'vhash': + if len(value) > 0: + return True + return 'Vhash must not be an empty string.' + case ('bin' | 'cc-number' | 'bank-account-nr' | 'aba-rtn' | 'prtn' | + 'phone-number' | 'whois-registrant-phone' | 'float'): + try: + float(value) + return True + except ValueError: + return f'The value has to be a valid {attribute_type}' + case 'cortex': + try: + json.loads(value) + return True + except json.JSONDecodeError: + return 'The Cortex analysis result has to be a valid JSON string.' + case 'boolean': + if isinstance(value, bool): + return True + return 'The value has to be either true or false.' + case 'AS': + if cls._is_positive_integer(value) and int(value) <= 4294967295: + return True + return 'AS number have to be integer between 1 and 4294967295' + case 'uuid': + if UUID_RE.fullmatch(value): + return True + return 'The value has to be a valid UUID format.' + case _: + return value + + @staticmethod + def _handle_4byte_unicode(value): + # Replace 4-byte UTF-8 characters with '?' + return ''.join(ch if ord(ch) <= 0xFFFF else '?' for ch in value) + + @staticmethod + def _is_domain_valid(value): + return DOMAIN_RE.fullmatch(value) + + @staticmethod + def _is_hash_valid(attribute_type, value): + return len(value) == HASH_HEX_LENGTH[attribute_type] and HEX_RE.fullmatch(value) + + @classmethod + def _is_port_valid(cls, value): + return cls._is_positive_integer(value) and int(value) in range(1, 65536) + + @staticmethod + def _is_positive_integer(value: int | str) -> bool: + if isinstance(value, int): + return value >= 0 + return value.isdigit() and int(value) >= 0 + + @staticmethod + def _is_ssdeep(value): + return SSDEEP_RE.fullmatch(value) + + @classmethod + def _is_ssh_fingerprint(cls, value): + if value.startswith('SHA256:'): + try: + decoded = b64decode(value[7:]) + except Exception: + return False + return decoded is not None and len(decoded) == 32 + if value.startswith('MD5:'): + return cls._is_hash_valid('md5', value[3:].replace(':', '')) + return cls._is_hash_valid('md5', value.replace(':', '')) + + @staticmethod + def _is_tlsh_valid(value): + if value.startswith('t'): + value = value.lstrip('t') + return len(value) > 35 and HEX_RE.fullmatch(value) + + @staticmethod + def _is_telfhash_valid(value): + return len(value) in (70, 72) and HEX_RE.fullmatch(value) + + @staticmethod + def _normalise_ip(value): + # If IP is a CIDR + if '/' in value: + address, length = value.split('/', 2) + if ':' in address: + try: + address = str(ipaddress.IPv6Address(address)) + except ipaddress.AddressValueError: + return value + if length == '128': + return address + else: + try: + address = str(ipaddress.IPv4Address(address)) + except ipaddress.AddressValueError: + return value + if length == '32': + return address + return f'{address}/{length}' + try: + return ( + str(ipaddress.IPv6Address(value)) + if ':' in value + else str(ipaddress.IPv4Address(value)) + ) + except ipaddress.AddressValueError: + return value + + @classmethod + def _refang_value(cls, attribute_type, value): + for rule in REFANG_REGEX_TABLE: + if attribute_type in rule['types']: # type: ignore + value = rule['from'].sub(rule['to'], value) # type: ignore + return cls._handle_4byte_unicode(value) + + @classmethod + def _validate_ip(cls, value): + if '/' in value: + composite = value.split('/') + if len(composite) != 2 or not cls._is_positive_integer(composite[1]): + return ('Invalid CIDR notation value found.') + address, length = composite + try: + ip_obj = ipaddress.ip_address(address) + if isinstance(ip_obj, ipaddress.IPv4Address): + if int(length) > 32: + return ( + 'Invalid CIDR notation value found, for ' + 'IPv4 must be lower or equal 32.' + ) + return True + if isinstance(ip_obj, ipaddress.IPv6Address): + if int(length) > 128: + return ( + 'Invalid CIDR notation value found, for ' + 'IPv6 must be lower or equal 128.' + ) + return True + except ValueError: + return 'IP address has an invalid format.' + try: + ipaddress.ip_address(value) + except ValueError: + return 'IP address has an invalid format.' + return True + + +def validate_attribute(attribute: dict | MISPAttribute) -> MISPAttribute: # type: ignore + """ + Validates a MISP Attribute and returns a MISPAttribute if valid. + Replicates MISP server-side validation behavior on Attributes. + + :param attribute: dict or MISPAttribute to validate + :return: Validated MISPAttribute object + :raises PyMISPError: If the attribute cannot be loaded or a validation error occurs + :raises ValidationError: If the attribute is invalid + """ + if not isinstance(attribute, MISPAttribute): + try: + attribute = _load_misp_attribute(attribute) + except Exception as e: + message = f'Error loading Attribute: {e}' + logger.error(message) + raise PyMISPError(message) + is_edited = attribute.edited + try: + value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) + validated = AttributeValidationTool.validate(attribute.type, value) + except Exception as e: + message = f'Error validating Attribute <{attribute.uuid}>: {e}' + logger.error(message) + raise PyMISPError(message) + if validated is not True: + message = _message_logging(validated, attribute) + logger.warning(message) + raise ValidationError(message) + if attribute.value != value: + attribute.value = value + attribute.edited = is_edited + return attribute + + +def validate_attributes(attributes: list, errors: dict) -> Generator: # type: ignore + """ + Validates a list of MISP attributes and skips any that doesn't validate. + + :param attributes: List of MISPAttribute objects + :param errors: Dictionary to populate with any validation error messages + :return: Generator yielding only valid MISPAttribute objects + """ + for attribute in attributes: + try: + misp_attribute = validate_attribute(attribute) + except ValidationError as e: + _populate_error_message(errors, 'warnings', str(e)) + continue + except PyMISPError as e: + _populate_error_message(errors, 'errors', str(e)) + continue + yield misp_attribute + + +def validate_event(event: dict | MISPEvent, errors: dict) -> MISPEvent: # type: ignore + """ + Validates an event and skips Attributes or Object Attributes that don't validate. + + :param event: MISPEvent object or dict representing an event + :param errors: Dictionary to populate with any validation error messages + :return: MISPEvent with only valid attributes + :raises PyMISPError: If the event cannot be loaded + """ + if not isinstance(event, MISPEvent): + try: + event = _load_misp_event(event) + except Exception as e: + message = f'Error loading Event: {e}' + logger.error(message) + raise PyMISPError(message) + # Validation of Attributes + event.attributes = list(validate_attributes(event.attributes, errors)) + # Validation of Objects + event.objects = list(validate_objects(event.objects, errors)) + return event + + +def validate_object(misp_object: dict | MISPObject, errors: dict) -> MISPObject: # type: ignore + """ + Validates an object and skips any Object Attribute that doesn't validate. + + :param misp_object: MISPObject object or dict representing an object + :param errors: Dictionary to populate with any validation error messages + :return: MISPObject with only valid attributes + :raises PyMISPError: If the object cannot be loaded + """ + if not isinstance(misp_object, MISPObject): + try: + misp_object = _load_misp_object(misp_object) + except Exception as e: + message = f'Error loading Object: {e}' + logger.error(message) + raise PyMISPError(message) + is_edited = misp_object.edited + # Validation of Object Attributes + misp_object.attributes = list(_validate_object_attributes(misp_object, errors)) + misp_object.edited = is_edited + return misp_object + + +def validate_objects(misp_objects: list, errors: dict) -> Generator: # type: ignore + """ + Validates a list of MISP objects and skips any Object Attribute that + doesn't validate. + + :param misp_objects: List of MISPObject objects + :param errors: Dictionary to populate with any validation error messages + :return: Generator yielding only valid MISPObject objects + """ + for mispObject in misp_objects: + try: + misp_object = validate_object(mispObject, errors) + except PyMISPError as e: + _populate_error_message(errors, 'errors', str(e)) + continue + yield misp_object + + +def _load_misp_attribute(attribute: dict) -> MISPAttribute: # type: ignore + misp_attribute = MISPAttribute() + misp_attribute.from_dict(**attribute) + return misp_attribute + + +def _load_misp_event(event: dict) -> MISPEvent: # type: ignore + misp_event = MISPEvent() + misp_event.from_dict(**event) + return misp_event + + +def _load_misp_object(mispObject: dict) -> MISPObject: # type: ignore + misp_object = MISPObject(mispObject['name']) + misp_object.from_dict(**mispObject) + return misp_object + + +def _message_logging(validated: str, attribute: MISPAttribute, misp_object: MISPObject | None = None) -> str: + message = f'Failed validation for {attribute.type} Attribute <{attribute.uuid}>' + if misp_object is not None: + message = f'{message} in {misp_object.name} Object <{misp_object.uuid}>' + return f'{message}:\n{attribute.value} - {validated}' + + +def _populate_error_message(errors: dict[str, list[str]], key: str, message: str) -> None: + try: + errors[key].append(message) + except KeyError: + errors[key] = [message] + + +def _validate_object_attributes(misp_object: MISPObject, errors: dict) -> Generator: # type: ignore + for attribute in misp_object.attributes: + is_edited = attribute.edited + try: + value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) + validated = AttributeValidationTool.validate(attribute.type, value) + except Exception as e: + message = f'Error validating Object Attribute <{attribute.uuid}> in Object <{misp_object.uuid}>: {e}' + logger.error(message) + _populate_error_message(errors, 'errors', message) + continue + if validated is not True: + message = _message_logging(validated, attribute, misp_object) + logger.warning(message) + _populate_error_message(errors, 'warnings', message) + continue + if attribute.value != value: + attribute.value = value + attribute.edited = is_edited + yield attribute diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py new file mode 100644 index 00000000..bf2afee6 --- /dev/null +++ b/tests/test_attributevalidationtool.py @@ -0,0 +1,415 @@ +import unittest +from collections import defaultdict +from datetime import datetime +from pymisp import MISPAttribute, MISPObject +from pymisp.tools import ( + AttributeValidationTool, validate_attribute, validate_attributes, + validate_event, validate_object, validate_objects, ValidationError +) + +class TestAttributeValidationTool(unittest.TestCase): + + def _should_be_valid(self, type_, *values): + for value in values: + self.assertTrue(AttributeValidationTool.validate(type_, value)) + + def _should_be_invalid(self, type_, *values): + for value in values: + self.assertNotEqual( + AttributeValidationTool.validate(type_, value), True + ) + + def test_modify_before_validation_as(self): + self.assertEqual('123', AttributeValidationTool.modifyBeforeValidation('AS', 'AS123')) + self.assertEqual(65537, AttributeValidationTool.modifyBeforeValidation('AS', '1.1')) + + def test_modify_before_validation_boolean(self): + self.assertEqual(True, AttributeValidationTool.modifyBeforeValidation('boolean', 'True')) + self.assertEqual(False, AttributeValidationTool.modifyBeforeValidation('boolean', 'False')) + self.assertEqual(True, AttributeValidationTool.modifyBeforeValidation('boolean', 1)) + self.assertEqual(False, AttributeValidationTool.modifyBeforeValidation('boolean', 0)) + + def test_modify_before_validation_domain(self): + self.assertEqual('example.com', AttributeValidationTool.modifyBeforeValidation('domain', 'example.com')) + self.assertEqual('example.com', AttributeValidationTool.modifyBeforeValidation('domain', 'EXAMPLE.COM')) + self.assertEqual('example.com|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'example.com|127.0.0.1')) + self.assertEqual('example.com|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'EXAMPLE.COM|127.0.0.1')) + self.assertEqual('xn--hkyrky-ptac70bc.cz', AttributeValidationTool.modifyBeforeValidation('domain', 'háčkyčárky.cz')) + self.assertEqual('xn--hkyrky-ptac70bc.cz', AttributeValidationTool.modifyBeforeValidation('domain', 'HÁČKYČÁRKY.CZ')) + self.assertEqual('xn--hkyrky-ptac70bc.cz|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'háčkyčárky.cz|127.0.0.1')) + self.assertEqual('xn--hkyrky-ptac70bc.cz|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'HÁČKYČÁRKY.CZ|127.0.0.1')) + + def test_modify_before_validation_filename_hash(self): + self.assertEqual('CMD.EXE|0cc175b9c0f1b6a831c399e269772661', AttributeValidationTool.modifyBeforeValidation('filename|md5', 'CMD.EXE|0CC175B9C0F1B6A831C399E269772661')) + + def test_modify_before_validation_financial(self): + self.assertEqual('123456', AttributeValidationTool.modifyBeforeValidation('cc-number', '1234-56')) + self.assertEqual('123456', AttributeValidationTool.modifyBeforeValidation('bin', '1234 56')) + self.assertEqual('ABC12', AttributeValidationTool.modifyBeforeValidation('iban', 'abc-12')) + self.assertEqual('ABC12', AttributeValidationTool.modifyBeforeValidation('bic', 'abc 12')) + + def test_modify_before_validation_hostname(self): + self.assertEqual('example.com|80', AttributeValidationTool.modifyBeforeValidation('hostname|port', 'example.com:80')) + self.assertEqual('example.com|80', AttributeValidationTool.modifyBeforeValidation('hostname|port', 'EXAMPLE.COM:80')) + + def test_modify_before_validation_ip(self): + self.assertEqual('127.0.0.1', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1/32')) + self.assertEqual('127.0.0.1/31', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1/31')) + self.assertEqual('example.com|1234:fd2:5621:1:89::4500', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'example.com|1234:0fd2:5621:0001:0089:0000:0000:4500/128')) + self.assertEqual('1234:fd2:5621:1:89::4500|80', AttributeValidationTool.modifyBeforeValidation('ip-src|port', '1234:0fd2:5621:0001:0089:0000:0000:4500/128|80')) + self.assertEqual('1234:fd2:5621:1:89::4500/127|80', AttributeValidationTool.modifyBeforeValidation('ip-src|port', '1234:0fd2:5621:0001:0089:0000:0000:4500/127|80')) + self.assertEqual('127.0.0.1', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1')) + + def test_modify_before_validation_ipv6(self): + self.assertEqual('1234:fd2:5621:1:89::4500', AttributeValidationTool.modifyBeforeValidation('ip-src', '1234:0fd2:5621:0001:0089:0000:0000:4500')) + self.assertEqual('example.com|1234:fd2:5621:1:89::4500', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'example.com|1234:0fd2:5621:0001:0089:0000:0000:4500')) + self.assertEqual('1234:fd2:5621:1:89::4500|80', AttributeValidationTool.modifyBeforeValidation('ip-src|port', '1234:0fd2:5621:0001:0089:0000:0000:4500|80')) + self.assertEqual('127.0.0.1', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1')) + + def test_modify_before_validation_hashes(self): + # Hashes should be lowercased + for type_ in ['md5', 'sha1', 'sha256', 'email', 'hostname']: + self.assertEqual('abc', AttributeValidationTool.modifyBeforeValidation(type_, 'ABC')) + self.assertEqual('abc', AttributeValidationTool.modifyBeforeValidation(type_, ' AbC ')) + + def test_modify_before_validation_mac(self): + self.assertEqual('aa:bb:cc:dd:ee:ff', AttributeValidationTool.modifyBeforeValidation('mac-address', 'AA-BB-CC-DD-EE-FF')) + self.assertEqual('aa:bb:cc:dd:ee:ff', AttributeValidationTool.modifyBeforeValidation('mac-address', 'aabbccddeeff')) + + def test_modify_before_validation_phone(self): + self.assertEqual('+123456', AttributeValidationTool.modifyBeforeValidation('phone-number', '00123456')) + self.assertEqual('+123456', AttributeValidationTool.modifyBeforeValidation('phone-number', '+1 (23) 456')) + self.assertEqual('+123456', AttributeValidationTool.modifyBeforeValidation('prtn', '00123456')) + + def test_modify_before_validation_uppercase(self): + # HTTP methods and hex should be uppercased + self.assertEqual('POST', AttributeValidationTool.modifyBeforeValidation('http-method', 'post')) + self.assertEqual('AABB', AttributeValidationTool.modifyBeforeValidation('hex', 'aabb')) + + def test_modify_before_validation_vulnerability(self): + self.assertEqual('CVE-2020-1234', AttributeValidationTool.modifyBeforeValidation('vulnerability', 'CVE-2020-1234')) + self.assertEqual('CVE-2020-1234', AttributeValidationTool.modifyBeforeValidation('vulnerability', 'cve-2020-1234')) + self.assertEqual('CVE-2020-1234', AttributeValidationTool.modifyBeforeValidation('vulnerability', 'CVE–2020–1234')) # en-dash + + def test_modify_before_validation_weakness(self): + self.assertEqual('CWE-89', AttributeValidationTool.modifyBeforeValidation('weakness', 'cwe-89')) + self.assertEqual('CWE-89', AttributeValidationTool.modifyBeforeValidation('weakness', 'CWE–89')) + + def test_modify_before_validation_x509(self): + self.assertEqual('aa6fc83f37787abea6be2c5126163fd3', AttributeValidationTool.modifyBeforeValidation('x509-fingerprint-md5', 'AA:6F:C8:3F:37:78:7A:BE:A6:BE:2C:51:26:16:3F:D3')) + + def test_validate_as(self): + self._should_be_valid('AS', '0', 0, 1, '1', 4294967295, '1.1') + self._should_be_invalid('AS', '1.2.3.4') + + def test_validate_domain_ip(self): + self._should_be_valid( + 'domain|ip', 'example.com|127.0.0.1', 'example.com|::1' + ) + self._should_be_invalid( + 'domain|ip', 'example.com|127', 'example.com|1', + ) + + def test_validate_filename(self): + self._should_be_valid('filename', 'cmd.exe', 'cmd.com') + self._should_be_invalid('filename', 'cmd.exe\ncmd.com') + self._should_be_valid( + 'filename|md5', 'cmd.exe|0cc175b9c0f1b6a831c399e269772661', + 'cmd.com|0cc175b9c0f1b6a831c399e269772661' + ) + self._should_be_invalid('filename|md5', 'cmd.exe\ncmd.com|0cc175b9c0f1b6a831c399e269772661') + + def test_validate_hashes(self): + self._should_be_valid('filename|md5', 'cmd.exe|0cc175b9c0f1b6a831c399e269772661') + self._should_be_invalid('filename|md5', 'cmd.exe|86f7e437faa5a7fce15d1ddcb9eaeaea377667b8') + self._should_be_valid( + 'tlsh', + 'b2317c38fac0333c8ff7d3ff31fcf3b7fb3f9a3ef3bf3c880cfc43ebf97f3cc73fbfc', + 't1fdd4e000b6a1c034f1f612f849b6a3a4b53f7ea1677481cf12d916ea4a79af1ed31317' + ) + self._should_be_valid( + 'filename|tlsh', + 'cmd.exe|b2317c38fac0333c8ff7d3ff31fcf3b7fb3f9a3ef3bf3c880cfc43ebf97f3cc73fbfc', + 'cmd.exe|t1fdd4e000b6a1c034f1f612f849b6a3a4b53f7ea1677481cf12d916ea4a79af1ed31317' + ) + self._should_be_valid( + 'ssdeep', + '96:s4Ud1Lj96tHHlZDrwciQmA+4uy1I0G4HYuL8N3TzS8QsO/wqWXLcMSx:sF1LjEtHHlZDrJzrhuyZvHYm8tKp/RWO', + '384:EWo4X1WaPW9ZWhWzLo+lWpct/fWbkWsWIwW0/S7dZhgG8:EWo4X1WmW9ZWhWH/WpchfWgWsWTWtf8', + '6144:3wSQSlrBHFjOvwYAU/Fsgi/2WDg5+YaNk5xcHrYw+Zg+XrZsGEREYRGAFU25ttR/:ctM7E0L4q' + ) + self._should_be_valid( + 'filename|ssdeep', + 'ahoj.txt|96:s4Ud1Lj96tHHlZDrwciQmA+4uy1I0G4HYuL8N3TzS8QsO/wqWXLcMSx:sF1LjEtHHlZDrJzrhuyZvHYm8tKp/RWO' + ) + self._should_be_valid('dom-hash', '0cc175b9c0f1b6a831c399e269772661') + + self._should_be_valid('telfhash', 'a' * 70, 'a' * 72) + self._should_be_invalid('telfhash', 'a' * 69, 'z' * 70) # z is not hex + + self._should_be_valid('pehash', 'a' * 40) + self._should_be_invalid('pehash', 'a' * 39, 'z' * 40) + + self._should_be_valid('impfuzzy', '3:aabbcc:ddeeff') + self._should_be_invalid('impfuzzy', '3:aabbcc', 'x:aabbcc:ddeeff') + + self._should_be_valid('cdhash', 'a' * 40) + self._should_be_invalid('cdhash', 'a' * 39, 'z' * 40) + + self._should_be_valid('filename|vhash', 'file.txt|vhash123') + self._should_be_invalid('filename|vhash', 'file.txt') + + def test_validate_identifiers(self): + self._should_be_valid('vulnerability', 'CVE-2020-1234', 'GHSA-1234-1234-1234') + self._should_be_invalid('vulnerability', 'CVE-2020', 'invalid') + + self._should_be_valid('weakness', 'CWE-89') + self._should_be_invalid('weakness', 'CWE-ABC', 'invalid') + + self._should_be_valid('uuid', '123e4567-e89b-12d3-a456-426614174000') + self._should_be_invalid('uuid', '123e4567-e89b-12d3-a456-42661417400g', '123e4567-e89b-12d3-a456-42661417400') + + self._should_be_valid('target-user', 'user1') + self._should_be_invalid('target-user', 'user1\nuser2') + + def test_validate_ip(self): + for type_ in ['ip-src', 'ip-dst']: + self._should_be_valid( + type_, '127.0.0.1', '127.0.0.1/32', '::1', '::1/128' + ) + self._should_be_invalid( + type_, '127','127.0.0.', '127.0.0.1/', '127.0.0.1/32/1', + '127.0.0.1/128', '::1/257', '::1/257', '::1/128/1' + ) + + def test_validate_misc(self): + self._should_be_valid('windows-service-name', 'service1') + self._should_be_invalid('windows-service-name', 'service/1', 'service\\1', 'a' * 257) + + self._should_be_valid('link', 'http://example.com', 'ftp://example.com') + self._should_be_invalid('link', 'example.com') + + self._should_be_valid('hex', 'aabbcc') + self._should_be_invalid('hex', 'zz') + + self._should_be_valid('datetime', '2020-01-01T00:00:00') + self._should_be_invalid('datetime', '2020:01:01 00-00-00') + + self._should_be_valid('size-in-bytes', '1024', 1024) + self._should_be_invalid('size-in-bytes', '-1', 'abc') + self._should_be_valid('integer', '123', '-123') + self._should_be_invalid('integer', 'abc') + self._should_be_valid('float', '1.23', '-1.23') + self._should_be_invalid('float', 'abc') + + self._should_be_valid('iban', 'ALPHANUM123') + self._should_be_invalid('iban', 'invalid-char!') + self._should_be_valid('btc', '1BvBMSEYstWetqTFn5Au4m4GFg7xJaNVN2') + self._should_be_invalid('btc', 'invalid!') + + self._should_be_valid('cortex', '{"a": 1}') + self._should_be_invalid('cortex', '{a: 1}') + + self._should_be_valid('boolean', True, False) + self._should_be_invalid('boolean', 'maybe') + + def test_validate_networking(self): + self._should_be_valid('ip-dst|port', '127.0.0.1|80', '::1|80') + self._should_be_invalid('ip-dst|port', '127.0.0.1', '127.0.0.1|99999') + + self._should_be_valid('onion-address', 'abcdefghijklmnop.onion', 'abcdefghijklmnopqrstuvwxyz234567abcdefghijklmnopqrstuvwxyz23.onion') + self._should_be_invalid('onion-address', 'invalid.onion', 'abc.onion') + + self._should_be_valid('mac-address', 'aa:bb:cc:dd:ee:ff') + self._should_be_invalid('mac-address', 'aa:bb:cc:dd:ee:gg', 'aabbccddeeff') + + self._should_be_valid('mac-eui-64', 'aa:bb:cc:ff:fe:dd:ee:11') + self._should_be_invalid('mac-eui-64', 'aa:bb:cc:dd:ee:ff:00:11', 'aa:bb:cc:dd:ee:ff:00:gg') + + self._should_be_valid('hostname|port', 'example.com|80') + self._should_be_invalid('hostname|port', 'example.com', 'example.com|99999', 'invalid_domain|80') + + self._should_be_valid('email', 'test@example.com', 'a.b@c.d') + self._should_be_invalid('email', 'test@example', 'test.com') + + self._should_be_valid('http-method', 'GET', 'POST') + self._should_be_invalid('http-method', 'get', 'FIND') + + def test_validate_port(self): + self.assertTrue(AttributeValidationTool.validate('port', '1')) + self.assertTrue(AttributeValidationTool.validate('port', 1)) + self.assertTrue(AttributeValidationTool.validate('port', 80)) + self.assertNotEqual(AttributeValidationTool.validate('port', -80), True) + self.assertNotEqual(AttributeValidationTool.validate('port', '-80'), True) + + def test_validate_ssdeep(self): + self._should_be_valid('ssdeep', "768:+OFu8Q3w6QzfR5Jni6SQD7qSFDs6P93/q0XIc/UB5EPABWX:RFu8QAFzffJui79f13/AnB5EPAkX") + self._should_be_invalid('ssdeep', "768:+OFu8Q3w6QzfR5Jni6SQD7qSFDs6P93/q0XIc/UB5EPABWX\n\n:RFu8QAFzffJui79f13/AnB5EPAkX") + + def test_validate_ssh_fingerprint(self): + self._should_be_valid( + 'ssh-fingerprint', + '7b:e5:6f:a7:f4:f9:81:62:5c:e3:1f:bf:8b:57:6c:5a', + 'MD5:7b:e5:6f:a7:f4:f9:81:62:5c:e3:1f:bf:8b:57:6c:5a', + 'SHA256:mVPwvezndPv/ARoIadVY98vAC0g+P/5633yTC4d/wXE', + ) + + def test_validate_event(self): + event_dict = { + 'Event': { + 'info': 'Test Event', + 'Attribute': [ + {'type': 'ip-src', 'value': '1.1.1.1'}, # Valid + {'type': 'ip-src', 'value': '999.999.999.999'}, # Invalid + {'type': 'domain', 'value': 'google.com'}, # Valid + {'type': 'md5', 'value': 'invalid_md5'}, # Invalid + {'type': 'AS', 'value': '1.1'} # modified and valid + ], + 'Object': [ + { + 'name': 'file', + 'Attribute': [ + {'type': 'filename', 'object_relation': 'filename', 'value': 'test.txt'}, # Valid + {'type': 'md5', 'object_relation': 'md5', 'value': '0cc175b9c0f1b6a831c399e269772661'}, # Valid + {'type': 'md5', 'object_relation': 'md5', 'value': 'invalid_md5'}, # Invalid + ] + } + ] + } + } + + # Run validation + validated_event = validate_event(event_dict, errors := defaultdict(list)) # type: ignore + + # Check Attributes + self.assertEqual(len(validated_event.attributes), 3) + ip_attribute, domain_attribute, as_attribute = validated_event.attributes + self.assertEqual(ip_attribute.value, '1.1.1.1') + self.assertEqual(domain_attribute.value, 'google.com') + self.assertEqual(as_attribute.value, 65537) + + # Check Objects + self.assertEqual(len(validated_event.objects), 1) + file_object = validated_event.objects[0] + self.assertEqual(file_object.name, 'file') + self.assertEqual(len(file_object.attributes), 2) + filename_attribute, md5_attribute = file_object.attributes + self.assertEqual(filename_attribute.value, 'test.txt') + self.assertEqual(md5_attribute.value, '0cc175b9c0f1b6a831c399e269772661') + + # Check Errors + self.assertEqual(len(errors['warnings']), 3) + ip_error, *md5_errors = errors['warnings'] + self.assertIn('IP address has an invalid format.', ip_error) + for md5_error in md5_errors: + self.assertIn( + 'Checksum has an invalid length or format (expected: 32 hexadecimal characters).', + md5_error + ) + + def test_validate_attribute(self): + # Test with valid dict + attribute_dict = {'type': 'ip-src', 'value': '1.1.1.1'} + validated = validate_attribute(attribute_dict) + self.assertIsInstance(validated, MISPAttribute) + self.assertEqual(validated.value, '1.1.1.1') + + # Test with valid MISPAttribute + attribute = MISPAttribute() + attribute.from_dict(**attribute_dict) + validated = validate_attribute(attribute) + self.assertIsInstance(validated, MISPAttribute) + self.assertEqual(validated.value, '1.1.1.1') + + # Test with invalid dict + invalid_dict = {'type': 'ip-src', 'value': '999.999.999.999'} + with self.assertRaises(ValidationError) as cm: + validate_attribute(invalid_dict) + self.assertIn('IP address has an invalid format.', str(cm.exception)) + + # Test with invalid MISPAttribute + invalid_attribute = MISPAttribute() + invalid_attribute.from_dict(**invalid_dict) + with self.assertRaises(ValidationError) as cm: + validate_attribute(invalid_attribute) + self.assertIn('IP address has an invalid format.', str(cm.exception)) + + # Test modification + modified_dict = {'type': 'AS', 'value': 'AS123'} + validated = validate_attribute(modified_dict) + self.assertEqual(validated.value, '123') + + def test_validate_attributes(self): + attributes = [ + {'type': 'ip-src', 'value': '1.1.1.1'}, # Valid + {'type': 'ip-src', 'value': '999.999.999.999'}, # Invalid + {'type': 'domain', 'value': 'google.com'} # Valid + ] + + valid_attributes = list(validate_attributes(attributes, errors := defaultdict(list))) # type: ignore + + self.assertEqual(len(valid_attributes), 2) + self.assertEqual(valid_attributes[0].value, '1.1.1.1') + self.assertEqual(valid_attributes[1].value, 'google.com') + + self.assertEqual(len(errors['warnings']), 1) + self.assertIn('IP address has an invalid format.', errors['warnings'][0]) + + def test_validate_object(self): + object_dict = { + 'name': 'file', + 'Attribute': [ + {'type': 'filename', 'object_relation': 'filename', 'value': 'test.txt'}, # Valid + {'type': 'md5', 'object_relation': 'md5', 'value': 'invalid_md5'} # Invalid + ] + } + + # Test with dict + validated_object = validate_object(object_dict, errors := {}) # type: ignore + self.assertIsInstance(validated_object, MISPObject) + self.assertEqual(len(validated_object.attributes), 1) + self.assertEqual(validated_object.attributes[0].value, 'test.txt') + self.assertEqual(len(errors['warnings']), 1) + self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) + + # Test with MISPObject + misp_object = MISPObject('file') + misp_object.from_dict(**object_dict) + validated_object = validate_object(misp_object, errors := {}) + self.assertEqual(len(validated_object.attributes), 1) + self.assertEqual(validated_object.attributes[0].value, 'test.txt') + self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) + + def test_validate_objects(self): + objects = [ + { + 'name': 'file', + 'Attribute': [ + {'type': 'filename', 'object_relation': 'filename', 'value': 'test.txt'}, + {'type': 'md5', 'object_relation': 'md5', 'value': 'invalid_md5'} + ] + }, + { + 'name': 'x509', + 'Attribute': [ + {'type': 'x509-fingerprint-md5', 'object_relation': 'x509-fingerprint-md5', 'value': 'b2a5abfeef9e36964281a31e17b57c97'}, + {'type': 'datetime', 'object_relation': 'validity-not-before', 'value': '2022-01-01T00:00:00'} + ] + } + ] + + valid_objects = list(validate_objects(objects, errors := defaultdict(list))) # type: ignore + + self.assertEqual(len(valid_objects), 2) + file_object, x509_object = valid_objects + # First object should have 1 attribute (1 filtered out) + self.assertEqual(len(file_object.attributes), 1) + self.assertEqual(file_object.attributes[0].value, 'test.txt') + self.assertEqual(len(errors['warnings']), 1) + self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) + + # Second object should have 2 attributes + self.assertEqual(len(x509_object.attributes), 2) + self.assertEqual(x509_object.attributes[0].value, 'b2a5abfeef9e36964281a31e17b57c97') + validity = x509_object.attributes[1].value + self.assertEqual(x509_object.attributes[1].value, datetime(2022, 1, 1, 0, 0, 0))