From 927e522fa35f0fada170b02b9f16a290d072363c Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Mon, 17 Nov 2025 23:34:26 +0100 Subject: [PATCH 01/26] add: [tools] Attribute value valdation tool - Behaves like the built-in validation with `modifyBeforeValidation` and `validate` - Will include also the tooling to use this validation on PyMISP objects or JSON format to validate data that is not coming from MISP --- pymisp/tools/__init__.py | 4 +- pymisp/tools/attributevalidationtool.py | 607 ++++++++++++++++++++++++ 2 files changed, 610 insertions(+), 1 deletion(-) create mode 100644 pymisp/tools/attributevalidationtool.py diff --git a/pymisp/tools/__init__.py b/pymisp/tools/__init__.py index 45907b39c..8023fc800 100644 --- a/pymisp/tools/__init__.py +++ b/pymisp/tools/__init__.py @@ -13,6 +13,7 @@ from .asnobject import ASNObject # noqa from .geolocationobject import GeolocationObject # noqa from .git_vuln_finder_object import GitVulnFinderObject # noqa +from .attributevalidationtool import AttributeValidationTool # noqa from .vehicleobject import VehicleObject # noqa from .csvloader import CSVLoader # noqa @@ -51,5 +52,6 @@ 'GitVulnFinderObject', 'VehicleObject', 'CSVLoader', 'SSHAuthorizedKeysObject', 'feed_meta_generator', 'update_objects', 'EMailObject', 'URLObject', 'PEObject', 'PESectionObject', 'ELFObject', - 'ELFSectionObject', 'MachOObject', 'MachOSectionObject' + 'ELFSectionObject', 'MachOObject', 'MachOSectionObject', + 'AttributeValidationTool' ] diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py new file mode 100644 index 000000000..746e5517e --- /dev/null +++ b/pymisp/tools/attributevalidationtool.py @@ -0,0 +1,607 @@ +#!/usr/bin/env python3 + +import ipaddress +import json +import re +from base64 import b64decode +from datetime import datetime +from dateutil.parser import parse +from urllib.parse import urlparse + +HASH_HEX_LENGTH = { + 'authentihash': 64, + 'md5': 32, + 'imphash': 32, + 'telfhash': 70, + 'sha1': 40, + 'git-commit-id': 40, + 'x509-fingerprint-md5': 32, + 'x509-fingerprint-sha1': 40, + 'x509-fingerprint-sha256': 64, + 'ja3-fingerprint-md5': 32, + 'jarm-fingerprint': 62, + 'hassh-md5': 32, + 'hasshserver-md5': 32, + 'pehash': 40, + 'sha224': 56, + 'sha256': 64, + 'sha384': 96, + 'sha512': 128, + 'sha512/224': 56, + 'sha512/256': 64, + 'sha3-224': 56, + 'sha3-256': 64, + 'sha3-384': 96, + 'sha3-512': 128, + 'dom-hash': 32, +} +HTTP_METHODS = ( + 'OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT', + 'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK', + 'VERSION-CONTROL', 'REPORT', 'CHECKOUT', 'CHECKIN', 'UNCHECKOUT', + 'MKWORKSPACE', 'UPDATE', 'LABEL', 'MERGE', 'BASELINE-CONTROL', + 'MKACTIVITY', 'ORDERPATCH', 'ACL', 'PATCH', 'SEARCH' +) +VULNERABILITY_REGEXES = ( + r'CVE-\d{4}-\d{4,}', + r'GCVE-\d+-\d{4}-\d+', + r'fkie_cve-\d{4}-\d{4,}', + r'ghsa-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}', + r'pysec-\d{4}-\d{2,5}', + r'gsd-\d{4}-\d{4,5}', + r'mal-\d{4}-\d+', + r'wid-sec-w-\d{4}-\d{4}', + r'ncsc-\d{4}-\d{4}', + r'ssa-\d{6}', + r'rh(ba|ea|sa)-\d{4}:\d{4,}', + r'ics(ma|a)-\d{2}-\d{3}-\d{2}', + r'va-\d{2}-\d{3}-\d{2}', + r'cisco-sa(-[a-zA-Z0-9_]+)+', + r'sca-\d{4}-\d{4,}', + r'nn-\d{4}[:_]\d-\d{2}', + r'oxas-adv-\d{4}-\d{4}', + r'msrc_cve-\d{4}-\d{4,}', + r'var-\d{6}-\d{4}', + r'jvndb-\d{4}-\d{6}', + r'ts-\d{4}-\d{4}', + r'(open)?suse-su-\d{4}:\d{4,}-\d', + r'cnvd-\d{4}-\d{5}', + r'certfr-\d{4}-avi-\d{4}', + r'certfr-\d{4}-ale-\d{3}' +) + +CDHASH_RE = re.compile(r'^[0-9a-f]{40,}$') +EMAIL_RE = re.compile(r'^.[^\s]*\@.*\..*$', flags=re.IGNORECASE) +DOMAIN_RE = re.compile(r'^[A-Z0-9.\-_]+\.[A-Z0-9\-]{2,}$', flags=re.IGNORECASE) +HEX_RE = re.compile(r'^[0-9a-fA-F]+$') +MAC_ADDRESS_RE = re.compile(r'^([a-fA-F0-9]{2}[:]?){6}$') +MAC_EUI_64_RE = re.compile(r'^([a-fA-F0-9]{2}[:]?){8}$') +ONION_RE = re.compile(r'^([a-z2-7]{16}|[a-z2-7]{56})\.onion$') +REMOVE_NON_ALPHANUM_CAP_RE = re.compile(r'[^0-9A-Z]+') +REMOVE_NON_ALPHANUM_RE = re.compile(r'[^0-9A-Fa-f]') +REMOVE_NON_NUM_RE = re.compile(r'[^0-9]+') +REMOVE_PHONE_PARENTHESIS_RE = re.compile(r'\(0\)') +SANITISE_PHONE_NUMBER_RE = re.compile(r'[^\+0-9]+') +SSDEEP_RE = re.compile(r'^([0-9]+):([0-9a-zA-Z/+]*):([0-9a-zA-Z/+]*)$') +VULNERABILITY_RE = re.compile( + r'^(?:' + '|'.join(VULNERABILITY_REGEXES) + r')$', flags=re.IGNORECASE +) +WEAKNESS_RE = re.compile(r"^CWE-[0-9]+$", flags=re.IGNORECASE) + + +class AttributeValidationTool: + @classmethod + def modifyBeforeValidation(cls, attribute_type, value): + value = cls._handle_4byte_unicode(value) + match attribute_type: + case ('ip-src' | 'ip-dst'): + return cls._normalise_ip(value) + case ('md5' | 'sha1' | 'sha224' | 'sha256' | 'sha384' | 'sha512' | + 'sha512/224' | 'sha512/256' | 'sha3-224' | 'sha3-256' | + 'sha3-384' | 'sha3-512' | 'ja3-fingerprint-md5' | + 'jarm-fingerprint' | 'hassh-md5' | 'hasshserver-md5' | + 'hostname' | 'pehash' | 'authentihash' | 'vhash' | 'imphash' | + 'telfhash' | 'tlsh' | 'anonymised' | 'cdhash' | 'email' | + 'email-src' | 'email-dst' | 'target-email' | + 'whois-registrant-email' | 'dom-hash' | 'onion-address'): + return value.lower() + case 'domain': + value = value.lower().strip('.') + # Domain is not valid, try to convert to punycode + if not cls._is_domain_valid(value): + return value.encode('punycode') + return value + case 'domain|ip': + parts = value.lower().split('|') + if len(parts) != 2: + return value # not a composite + domain, ip = parts + domain = domain.strip('.') + # Domain is not valid, try to convert to punycode + if not cls._is_domain_valid(domain): + domain = domain.encode('punycode') + return f'{domain}|{cls._normalise_ip(ip)}' + case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | + 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | + 'filename|sha512' | 'filename|sha512/224' | + 'filename|sha512/256' | 'filename|sha3-224' | + 'filename|sha3-256' | 'filename|sha3-384' | + 'filename|sha3-512' | 'filename|authentihash' | + 'filename|vhash' | 'filename|pehash' | 'filename|tlsh'): + # Convert hash to lowercase + filename, _hash = value.split('|', 1) + return f'{filename}|{_hash.lower()}' + case 'http-method' | 'hex': + return value.upper() + case 'vulnerability': + value = value.replace('–', '-') + source = value.split('-')[0] + if source in ('cve', 'gcve'): + return value.upper() + return value + case 'weakness': + return value.replace('–', '-').upper() + case 'cc-number' | 'bin': + return re.sub(REMOVE_NON_NUM_RE, '', value) + case 'iban' | 'bic': + return re.sub(REMOVE_NON_ALPHANUM_CAP_RE, '', value.upper()) + case 'prtn' | 'whois-registrant-phone' | 'phone-number': + if value.startswith('00'): + value = f'+{value[2:]}' + value = re.sub(REMOVE_PHONE_PARENTHESIS_RE, '', value) + return re.sub(SANITISE_PHONE_NUMBER_RE, '', value) + case 'x509-fingerprint-md5' | 'x509-fingerprint-sha256' | 'x509-fingerprint-sha1': + return value.replace(':', '').lower() + case 'ip-dst|port' | 'ip-src|port': + if value.count(':') >= 2: # (ipv6|port) - tokenize ip and port + if '|' in value: # 2001:db8::1|80 + ip, port = value.split('|', 1) + return f'{cls._normalise_ip(ip)}|{port}' + if value.startswith('[') and ']' in value: # [2001:db8::1]:80 + ip, port = value[1:].split(']', 1) + return f'{cls._normalise_ip(ip)}|{port.lstrip(":")}' + for separator in ('.', ' port ', 'p', '#'): + if separator in value: + ip, port = value.split(separator, 1) + return f'{cls._normalise_ip(ip)}|{port}' + # 2001:db8::1:80 this one is ambiguous + *parts, port = value.split(':') + return f'{cls._normalise_ip(":".join(parts))}|{port}' + for separator in (':', '|'): + if separator in value: # ipv4:port or ipv4|port + ip, port = value.split(separator, 1) + return f'{cls._normalise_ip(ip)}|{port}' + return value + case 'mac-address' | 'mac-eui-64': + value = re.sub(REMOVE_NON_ALPHANUM_RE, '', value).lower() + return ':'.join(value[i:i+2] for i in range(0, 12, 2)) + case 'hostname|port': + return value.replace(':', '|').lower() + case 'boolean': + if isinstance(value, int): + return bool(value) + if isinstance(value, str): + value = value.lower().strip() + if value == 'true': + return True + if value == 'false': + return False + return value + case 'datetime': + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError: + try: + return parse(value) + except Exception: + pass + return value + case 'AS': + if value.upper().startswith('AS'): + value = value[2:] # remove 'AS' + if '.' in value: # maybe value is in asdot notation + multiplier, remainder = value.split('.', 1) + if cls._is_positive_integer(multiplier) and cls._is_positive_integer(remainder): + return multiplier * 65536 + remainder + return value + case _: + return value + + @classmethod + def validate(cls, attribute_type, value): + match attribute_type: + case ('md5' | 'imphash' | 'sha1' | 'sha224' | 'sha256' | 'sha384' | + 'sha512' | 'sha512/224' | 'sha512/256' | 'sha3-224' | + 'sha3-256' | 'sha3-384' | 'sha3-512' | 'authentihash' | + 'ja3-fingerprint-md5' | 'jarm-fingerprint' | 'hassh-md5' | + 'hasshserver-md5' | 'x509-fingerprint-md5' | + 'x509-fingerprint-sha256' | 'x509-fingerprint-sha1' | + 'git-commit-id' | 'dom-hash'): + if cls._is_hash_valid(attribute_type, value): + return True + length = HASH_HEX_LENGTH[type] + return ( + 'Checksum has an invalid length or format (expected: ' + f'{length} hexadecimal characters). Please double check ' + 'the value or select type "other".' + ) + case 'tlsh': + if cls._is_tlsh_valid(value): + return True + return ( + 'Checksum has an invalid length or format (expected: at ' + 'least 35 hexadecimal characters, optionally starting ' + 'with t1 instead of hexadecimal characters). Please ' + 'double check the value or select type "other".' + ) + case 'telfhash': + if cls._is_telfhash_valid(value): + return True + return ( + 'Checksum has an invalid length or format (expected: ' + '70 or 72 hexadecimal characters). Please double check ' + 'the value or select type "other".' + ) + case 'pehash': + if cls._is_hash_valid('pehash', value): + return True + return ( + "The input doesn't match the expected sha1 format " + '(expected: 40 hexadecimal characters). Keep in mind that ' + 'MISP currently only supports SHA1 for PEhashes, if you ' + 'would like to get the support extended to other hash ' + 'types, make sure to create a github ticket about it at ' + 'https://github.com/MISP/MISP!' + ) + case 'ssdeep': + if cls._is_ssdeep(value): + return True + return 'Invalid SSDeep hash. The format has to be blocksize:hash:hash' + case 'impfuzzy': + if value.count(':') == 2: + imports, *_ = value.split(':') + if cls._is_positive_integer(imports): + return True + return 'Invalid impfuzzy format. The format has to be imports:hash:hash' + case 'cdhash': + if CDHASH_RE.fullmatch(value): + return True + return ( + "The input doesn't match the expected format " + '(expected: 40 or more hexadecimal characters)' + ) + case 'http-method': + if value in HTTP_METHODS: + return True + return 'Unknown HTTP method.' + case 'filename|pehash': + if re.fullmatch(r'^.+\|[0-9a-f]{40}$#', value): + return True + return ( + "The input doesn't match the expected filename|sha1 format " + '(expected: filename|40 hexadecimal characters). Keep in ' + 'mind that MISP currently only supports SHA1 for PEhashes, ' + 'if you would like to get the support extended to other ' + 'hash types, make sure to create a github ticket about it ' + 'at https://github.com/MISP/MISP!' + ) + case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | + 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | + 'filename|sha512' | 'filename|sha512/224' | + 'filename|sha512/256' | 'filename|sha3-224' | + 'filename|sha3-256' | 'filename|sha3-384' | + 'filename|sha3-512' | 'filename|authentihash'): + length = HASH_HEX_LENGTH[attribute_type[9:]] # strip `filename|`] + if re.fullmatch(r'^.+\|[0-9a-f]{' + length + r'}$', value): + return True + return ( + 'Checksum has an invalid length or format (expected:' + f'filename|{length} hexadecimal characters). Please' + 'double check the value or select type "other".' + ) + case 'filename|ssdeep': + composite = value.split('|') + if len(composite) == 2: + filename, ssdeep = composite + if '\n' in filename: + return 'Filename must not contain new line character.' + if cls._is_ssdeep(ssdeep): + return True + return 'Invalid ssdeep hash (expected: blocksize:hash:hash).' + case 'filename|tlsh': + composite = value.split('|') + if len(composite) == 2: + filename, tlsh = composite + if '\n' in filename: + return 'Filename must not contain new line character.' + if cls._is_tlsh_valid(tlsh): + return True + return ( + 'TLSH hash has an invalid length or format (expected: ' + 'filename|at least 35 hexadecimal characters, optionally ' + 'starting with t1 instead of hexadecimal characters). ' + 'Please double check the value or select type "other".' + ) + case 'filename|vhash': + if re.fullmatch(r'^.+\|.+$', value): + return True + return ( + 'Checksum has an invalid length or format (expected: ' + 'filename|string characters). Please double check the ' + 'value or select type "other".' + ) + case 'ip-src' | 'ip-dst': + return cls._validate_ip(value) + case 'port': + if cls._is_port_valid(value): + return True + return 'Port numbers have to be integers between 1 and 65535.' + case 'ip-dst|port' | 'ip-src|port': + ip, port = value.split('|', 1) + if not cls._is_port_valid(port): + return 'Port numbers have to be integers between 1 and 65535.' + return cls._validate_ip(ip) + case 'onion-address': + if ONION_RE.fullmatch(value): + return True + return 'Onion address has an invalid format.' + case 'mac-address': + if MAC_ADDRESS_RE.fullmatch(value): + return True + return 'MAC address has an invalid format.' + case 'mac-eui-64': + if MAC_EUI_64_RE.fullmatch(value): + return True + return 'MAC EUI-64 address has an invalid format.' + case 'hostname' | 'domain': + if cls._is_domain_valid(value): + return True + return ( + f'{attribute_type.capitalize()} has an invalid format. ' + 'Please double check the value or select type "other".' + ); + case 'hostname|port': + hostname, port = value.split('|', 1) + if not cls._is_domain_valid(hostname): + return 'Hostname has an invalid format.' + if not cls._is_port_valid(port): + return 'Port numbers have to be integers between 1 and 65535.' + return True + case 'domain|ip': + domain, ip = value.split('|', 1) + if not cls._is_domain_valid(domain): + return 'Domain has an invalid format.' + return cls._validate_ip(ip) + case ('email' | 'email-src' | 'eppn' | 'email-dst' | 'target-email' | + 'whois-registrant-email' | 'dns-soa-email' | 'jabber-id'): + # we don't use the native function to prevent issues with partial email addresses + if EMAIL_RE.fullmatch(value): + return True + return ( + 'Email address has an invalid format. Please double ' + 'check the value or select type "other".' + ); + case 'vulnerability': + if VULNERABILITY_RE.fullmatch(value): + return True + return 'Invalid vulnerability ID format.' + case 'weakness': + if WEAKNESS_RE.fullmatch(value): + return True + return 'Invalid format. Expected: CWE-x...' + case 'windows-service-name' | 'windows-service-displayname': + if len(value) > 256 or re.search(r'[\\/]', value): + return ( + 'Invalid format. Only values shorter than 256 characters ' + "that don't include any forward or backward slashes are allowed." + ) + return True + case ('mutex' | 'process-state' | 'snort' | 'bro' | 'zeek' | + 'community-id' | 'anonymised' | 'pattern-in-file' | + 'pattern-in-traffic' | 'pattern-in-memory' | 'filename-pattern' | + 'pgp-public-key' | 'pgp-private-key' | 'yara' | 'stix2-pattern' | + 'sigma' | 'gene' | 'kusto-query' | 'mime-type' | + 'identity-card-number' | 'cookie' | 'attachment' | + 'malware-sample' | 'comment' | 'text' | 'other' | 'cpe' | + 'email-attachment' | 'email-body' | 'email-header' | + 'first-name' | 'middle-name' | 'last-name' | 'full-name'): + return True + case 'link': + parsed = urlparse(value) + if all([parsed.scheme, parsed.netloc]): + return True + return 'Link has to be a valid URL.' + case 'hex': + if HEX_RE.fullmatch(value): + return True + return 'Value has to be a hexadecimal string.' + case ('target-user' | 'campaign-name' | 'campaign-id' | + 'threat-actor' | 'target-machine' | 'target-org' | + 'target-location' | 'target-external' | 'email-subject' | + 'malware-type' | 'url' | 'uri' | 'user-agent' | 'regkey' | + 'regkey|value' | 'filename' | 'pdb' | 'windows-scheduled-task' | + 'whois-registrant-name' | 'whois-registrant-org' | + 'whois-registrar' | 'whois-creation-date' | 'date-of-birth' | + 'place-of-birth' | 'gender' | 'passport-number' | + 'passport-country' | 'passport-expiration' | 'redress-number' | + 'nationality' | 'visa-number' | 'issue-date-of-the-visa' | + 'primary-residence' | 'country-of-residence' | + 'special-service-request' | 'frequent-flyer-number' | + 'travel-details' | 'payment-details' | + 'place-port-of-original-embarkation' | 'place-port-of-clearance' | + 'place-port-of-onward-foreign-destination' | + 'passenger-name-record-locator-number' | + 'email-dst-display-name' | 'email-src-display-name' | + 'email-reply-to' | 'email-x-mailer' | 'email-mime-boundary' | + 'email-thread-index' | 'email-message-id' | 'github-username' | + 'github-repository' | 'github-organisation' | 'twitter-id' | + 'dkim' | 'dkim-signature' | 'favicon-mmh3' | + 'chrome-extension-id' | 'mobile-application-id' | + 'azure-application-id' | 'named pipe'): + if '\n' in value: + return 'Value must not contain new line character.' + return True + case 'ssh-fingerprint': + if cls._is_ssh_fingerprint(value): + return True + return 'SSH fingerprint must be in MD5 or SHA256 format.' + case 'datetime': + try: + parse(value) + return True + except Exception: + return 'Datetime has to be in the ISO 8601 format.' + case 'size-in-bytes' | 'counter': + if cls._is_positive_integer(value): + return True + return 'The value has to be a whole number greater or equal 0.' + # case 'targeted-threat-index': + # if (!is_numeric($value) || $value < 0 || $value > 10) { + # return __('The value has to be a number between 0 and 10.'); + # } + # return True + case 'integer': + try: + int(value) + return True + except ValueError: + return 'The value has to be an integer value.' + case 'iban' | 'bic' | 'btc' | 'dash' | 'xmr': + if value.isalnum(): + return True + return f'{attribute_type.upper()} has to be alphanumeric.' + case 'vhash': + if len(value) > 0: + return True + return 'Vhash must not be an empty string.' + case ('bin' | 'cc-number' | 'bank-account-nr' | 'aba-rtn' | 'prtn' | + 'phone-number' | 'whois-registrant-phone' | 'float'): + try: + float(value) + return True + except ValueError: + return f'The value has to be a valid {attribute_type}' + case 'cortex': + try: + json.loads(value) + return True + except json.JSONDecodeError: + return 'The Cortex analysis result has to be a valid JSON string.' + case 'boolean': + if isinstance(value, bool): + return True + return 'The value has to be either true or false.' + case 'AS': + if cls._is_positive_integer(value) and int(value) <= 4294967295: + return True + return 'AS number have to be integer between 1 and 4294967295' + case _: + return value + + @staticmethod + def _handle_4byte_unicode(value): + # Replace 4-byte UTF-8 characters with '?' + return ''.join(ch if ord(ch) <= 0xFFFF else '?' for ch in value) + + @staticmethod + def _is_domain_valid(value): + return DOMAIN_RE.fullmatch(value) + + @staticmethod + def _is_hash_valid(attribute_type, value): + return len(value) == HASH_HEX_LENGTH[attribute_type] and HEX_RE.fullmatch(value) + + @classmethod + def _is_port_valid(cls, value): + return cls._is_positive_integer(value) and int(value) in range(1, 65536) + + @staticmethod + def _is_positive_integer(value): + return value.isdigit() and int(value) >= 0 + + @staticmethod + def _is_ssdeep(value): + return SSDEEP_RE.fullmatch(value) + + @classmethod + def _is_ssh_fingerprint(cls, value): + if value.startswith('SHA256:'): + try: + decoded = b64decode(value[7:]) + except Exception: + return False + return decoded is not None and len(decoded) == 32 + if value.startswith('MD5:'): + return cls._is_hash_valid('md5', value[3:].replace(':', '')) + return cls._is_hash_valid('md5', value.replace(':', '')) + + @staticmethod + def _is_tlsh_valid(value): + if value.startswith('t'): + value = value.lstrip('t') + return len(value) > 35 and HEX_RE.fullmatch(value) + + @staticmethod + def _is_telfhash_valid(value): + return len(value) in (70, 72) + + @staticmethod + def _normalise_ip(value): + # If IP is a CIDR + if '/' in value: + address, length = value.split('/', 2) + if ':' in address: + try: + address = str(ipaddress.IPv6Address(address)) + except ipaddress.AddressValueError: + return value + if length == '128': + return address + else: + try: + address = str(ipaddress.IPv4Address(address)) + except ipaddress.AddressValueError: + return value + if length == '32': + return address + return f'{address}/{length}' + try: + return ( + str(ipaddress.IPv6Address(value)) + if ':' in value + else str(ipaddress.IPv4Address(value)) + ) + except ipaddress.AddressValueError: + return value + + @classmethod + def _validate_ip(cls, value): + if '/' in value: + composite = value.split('/') + if len(composite) != 2 or not cls._is_positive_integer(composite[1]): + return ('Invalid CIDR notation value found.') + address, length = composite + try: + ip_obj = ipaddress.ip_address(address) + if isinstance(ip_obj, ipaddress.IPv4Address): + if int(length) > 32: + return ( + 'Invalid CIDR notation value found, for ' + 'IPv4 must be lower or equal 32.' + ) + return True + if isinstance(ip_obj, ipaddress.IPv6Address): + if int(length) > 128: + return ( + 'Invalid CIDR notation value found, for ' + 'IPv6 must be lower or equal 128.' + ) + return True + except ValueError: + return 'IP address has an invalid format.' + try: + ipaddress.ip_address(value) + except ValueError: + return 'IP address has an invalid format.' + return True From 4c1b199efd6493bb919d082079546df0c714b9f0 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 18 Nov 2025 23:06:59 +0100 Subject: [PATCH 02/26] add: [tools] Validating `uuid` attribute type --- pymisp/tools/attributevalidationtool.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 746e5517e..aa613a874 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -83,6 +83,7 @@ REMOVE_PHONE_PARENTHESIS_RE = re.compile(r'\(0\)') SANITISE_PHONE_NUMBER_RE = re.compile(r'[^\+0-9]+') SSDEEP_RE = re.compile(r'^([0-9]+):([0-9a-zA-Z/+]*):([0-9a-zA-Z/+]*)$') +UUID_RE = re.compile(r'[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$') VULNERABILITY_RE = re.compile( r'^(?:' + '|'.join(VULNERABILITY_REGEXES) + r')$', flags=re.IGNORECASE ) @@ -496,6 +497,10 @@ def validate(cls, attribute_type, value): if cls._is_positive_integer(value) and int(value) <= 4294967295: return True return 'AS number have to be integer between 1 and 4294967295' + case 'uuid': + if UUID_RE.fullmatch(value): + return True + return 'The value has to be a valid UUID format.' case _: return value From 78c57ddb4ce23232a404abd622e9b75fe2b37b4b Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 18 Nov 2025 23:07:44 +0100 Subject: [PATCH 03/26] add: [tools] Added the refanging feature with the value modification that happens before validation --- pymisp/tools/attributevalidationtool.py | 41 +++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index aa613a874..d5f82b8e7 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -42,6 +42,36 @@ 'MKWORKSPACE', 'UPDATE', 'LABEL', 'MERGE', 'BASELINE-CONTROL', 'MKACTIVITY', 'ORDERPATCH', 'ACL', 'PATCH', 'SEARCH' ) +REFANG_REGEX_TABLE = ( + { + 'from': re.compile(r'^(hxxp|hxtp|htxp|meow|h\[tt\]p)', re.IGNORECASE), + 'to': 'http', + 'types': ('link', 'url') + }, + { + 'from': re.compile(r'(\[\.\]|\[dot\]|\(dot\))', re.IGNORECASE), + 'to': '.', + 'types': ( + 'link', 'url', 'ip-dst', 'ip-src', 'domain|ip', 'domain', + 'hostname', 'email', 'email-src', 'email-dst' + ) + }, + { + 'from': re.compile(r'\[hxxp:\/\/\]', re.IGNORECASE), + 'to': 'http', + 'types': ('link', 'url') + }, + { + 'from': re.compile(r'\[\@\]|\[at\]', re.IGNORECASE), + 'to': '@', + 'types': ('email', 'email-src', 'email-dst') + }, + { + 'from': re.compile(r'\[:\]'), + 'to': ':', + 'types': ('link', 'url') + } +) VULNERABILITY_REGEXES = ( r'CVE-\d{4}-\d{4,}', r'GCVE-\d+-\d{4}-\d+', @@ -93,7 +123,7 @@ class AttributeValidationTool: @classmethod def modifyBeforeValidation(cls, attribute_type, value): - value = cls._handle_4byte_unicode(value) + value = cls._refang_value(attribute_type, value.strip()) match attribute_type: case ('ip-src' | 'ip-dst'): return cls._normalise_ip(value) @@ -182,7 +212,7 @@ def modifyBeforeValidation(cls, attribute_type, value): if isinstance(value, int): return bool(value) if isinstance(value, str): - value = value.lower().strip() + value = value.lower() if value == 'true': return True if value == 'false': @@ -580,6 +610,13 @@ def _normalise_ip(value): except ipaddress.AddressValueError: return value + @classmethod + def _refang_value(cls, attribute_type, value): + for rule in REFANG_REGEX_TABLE: + if attribute_type in rule['types']: + value = rule['from'].sub(rule['to'], value) + return cls._handle_4byte_unicode(value) + @classmethod def _validate_ip(cls, value): if '/' in value: From 8456e78a1581a6dcca2b3a28f376d8e697a3d001 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 19 Nov 2025 18:30:41 +0100 Subject: [PATCH 04/26] fix: [tools] Fixed modification for `domain` and validation for `AS` & `filename|hash` attributes --- pymisp/tools/attributevalidationtool.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index d5f82b8e7..8859e5708 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -140,7 +140,7 @@ def modifyBeforeValidation(cls, attribute_type, value): value = value.lower().strip('.') # Domain is not valid, try to convert to punycode if not cls._is_domain_valid(value): - return value.encode('punycode') + return value.encode('idna').decode('ascii') return value case 'domain|ip': parts = value.lower().split('|') @@ -150,7 +150,7 @@ def modifyBeforeValidation(cls, attribute_type, value): domain = domain.strip('.') # Domain is not valid, try to convert to punycode if not cls._is_domain_valid(domain): - domain = domain.encode('punycode') + domain = domain.encode('idna').decode('ascii') return f'{domain}|{cls._normalise_ip(ip)}' case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | @@ -324,7 +324,7 @@ def validate(cls, attribute_type, value): 'filename|sha3-256' | 'filename|sha3-384' | 'filename|sha3-512' | 'filename|authentihash'): length = HASH_HEX_LENGTH[attribute_type[9:]] # strip `filename|`] - if re.fullmatch(r'^.+\|[0-9a-f]{' + length + r'}$', value): + if re.fullmatch(r'^.+\|[0-9a-f]{' + str(length) + r'}$', value): return True return ( 'Checksum has an invalid length or format (expected:' @@ -552,7 +552,9 @@ def _is_port_valid(cls, value): return cls._is_positive_integer(value) and int(value) in range(1, 65536) @staticmethod - def _is_positive_integer(value): + def _is_positive_integer(value: int | str) -> bool: + if isinstance(value, int): + return value >= 0 return value.isdigit() and int(value) >= 0 @staticmethod From 90b89ccd236d682ed92a1305479190bea15badbd Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 19 Nov 2025 23:23:42 +0100 Subject: [PATCH 05/26] fix: [tools] Fixed modification for non str values and filename|hash attributes - Avoiding issues with non str values that don't support some str built-in operations - Avoiding issues with wrong filename|hash value format --- pymisp/tools/attributevalidationtool.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 8859e5708..4d8276f44 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -123,7 +123,8 @@ class AttributeValidationTool: @classmethod def modifyBeforeValidation(cls, attribute_type, value): - value = cls._refang_value(attribute_type, value.strip()) + if isinstance(value, str): + value = cls._refang_value(attribute_type, value.strip()) match attribute_type: case ('ip-src' | 'ip-dst'): return cls._normalise_ip(value) @@ -160,7 +161,10 @@ def modifyBeforeValidation(cls, attribute_type, value): 'filename|sha3-512' | 'filename|authentihash' | 'filename|vhash' | 'filename|pehash' | 'filename|tlsh'): # Convert hash to lowercase - filename, _hash = value.split('|', 1) + composite = value.split('|') + if len(composite) != 2: + return value # not a composite + filename, _hash = composite return f'{filename}|{_hash.lower()}' case 'http-method' | 'hex': return value.upper() From 3b36378715ccd23a7a827d2f1d9e1a69896e804c Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 19 Nov 2025 23:26:12 +0100 Subject: [PATCH 06/26] fix: [tools] Fixed validation of some composite attributes - Avoiding issues with wrong composite attribute value format --- pymisp/tools/attributevalidationtool.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 4d8276f44..f1bb6a130 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -373,7 +373,10 @@ def validate(cls, attribute_type, value): return True return 'Port numbers have to be integers between 1 and 65535.' case 'ip-dst|port' | 'ip-src|port': - ip, port = value.split('|', 1) + composite = value.split('|') + if len(composite) != 2: + return 'Invalid ip-dst|port format.' + ip, port = composite if not cls._is_port_valid(port): return 'Port numbers have to be integers between 1 and 65535.' return cls._validate_ip(ip) @@ -397,14 +400,20 @@ def validate(cls, attribute_type, value): 'Please double check the value or select type "other".' ); case 'hostname|port': - hostname, port = value.split('|', 1) + composite = value.split('|') + if len(composite) != 2: + return 'Invalid hostname|port format.' + hostname, port = composite if not cls._is_domain_valid(hostname): return 'Hostname has an invalid format.' if not cls._is_port_valid(port): return 'Port numbers have to be integers between 1 and 65535.' return True case 'domain|ip': - domain, ip = value.split('|', 1) + composite = value.split('|') + if len(composite) != 2: + return 'Invalid domain|ip format.' + domain, ip = composite if not cls._is_domain_valid(domain): return 'Domain has an invalid format.' return cls._validate_ip(ip) From 85e698a8dfadd3b24ff5e4c7972aa4b8ff5531c5 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 20 Nov 2025 11:42:41 +0100 Subject: [PATCH 07/26] fix: [tools] Making sure we multiply integer and not a string... --- pymisp/tools/attributevalidationtool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index f1bb6a130..e48ab8418 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -238,7 +238,7 @@ def modifyBeforeValidation(cls, attribute_type, value): if '.' in value: # maybe value is in asdot notation multiplier, remainder = value.split('.', 1) if cls._is_positive_integer(multiplier) and cls._is_positive_integer(remainder): - return multiplier * 65536 + remainder + return int(multiplier) * 65536 + int(remainder) return value case _: return value From 89aceaa8cb62419bd9b4769b30eb9abd9d24f658 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 20 Nov 2025 15:20:13 +0100 Subject: [PATCH 08/26] fix: [tools] Fixed validation for `mac-addres`, `mac-eui-64` & `telfhash` attributes --- pymisp/tools/attributevalidationtool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index e48ab8418..8f80255fc 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -104,8 +104,8 @@ EMAIL_RE = re.compile(r'^.[^\s]*\@.*\..*$', flags=re.IGNORECASE) DOMAIN_RE = re.compile(r'^[A-Z0-9.\-_]+\.[A-Z0-9\-]{2,}$', flags=re.IGNORECASE) HEX_RE = re.compile(r'^[0-9a-fA-F]+$') -MAC_ADDRESS_RE = re.compile(r'^([a-fA-F0-9]{2}[:]?){6}$') -MAC_EUI_64_RE = re.compile(r'^([a-fA-F0-9]{2}[:]?){8}$') +MAC_ADDRESS_RE = re.compile(r'^([a-f0-9]{2}:){5}[a-f0-9]{2}$') +MAC_EUI_64_RE = re.compile(r'^([a-f0-9]{2}:){3}ff:fe:(:[a-f0-9]{2}){3}$') ONION_RE = re.compile(r'^([a-z2-7]{16}|[a-z2-7]{56})\.onion$') REMOVE_NON_ALPHANUM_CAP_RE = re.compile(r'[^0-9A-Z]+') REMOVE_NON_ALPHANUM_RE = re.compile(r'[^0-9A-Fa-f]') @@ -594,7 +594,7 @@ def _is_tlsh_valid(value): @staticmethod def _is_telfhash_valid(value): - return len(value) in (70, 72) + return len(value) in (70, 72) and HEX_RE.fullmatch(value) @staticmethod def _normalise_ip(value): From 9c7890d779b7edb928b0649ed34ee5bbf2731d81 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 20 Nov 2025 19:26:22 +0100 Subject: [PATCH 09/26] fix: [tools] Fixed variable name --- pymisp/tools/attributevalidationtool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 8f80255fc..480cb4b4b 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -255,7 +255,7 @@ def validate(cls, attribute_type, value): 'git-commit-id' | 'dom-hash'): if cls._is_hash_valid(attribute_type, value): return True - length = HASH_HEX_LENGTH[type] + length = HASH_HEX_LENGTH[attribute_type] return ( 'Checksum has an invalid length or format (expected: ' f'{length} hexadecimal characters). Please double check ' From 05e900f8bf6514f399c135df5fa789a856c3d0d3 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 20 Nov 2025 19:27:52 +0100 Subject: [PATCH 10/26] add: [tools] Added method to validate Attributes and Object Attributes in events - Takes dict or MISPEvent as input - Prepares the attribute values for validation - Validates the attribute values - Returns the validated Event with only valid Attributes - Logs warning messages with the skipped values --- pymisp/tools/__init__.py | 4 +- pymisp/tools/attributevalidationtool.py | 61 +++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/pymisp/tools/__init__.py b/pymisp/tools/__init__.py index 8023fc800..54b13e65c 100644 --- a/pymisp/tools/__init__.py +++ b/pymisp/tools/__init__.py @@ -13,7 +13,7 @@ from .asnobject import ASNObject # noqa from .geolocationobject import GeolocationObject # noqa from .git_vuln_finder_object import GitVulnFinderObject # noqa -from .attributevalidationtool import AttributeValidationTool # noqa +from .attributevalidationtool import AttributeValidationTool, validate_event # noqa from .vehicleobject import VehicleObject # noqa from .csvloader import CSVLoader # noqa @@ -53,5 +53,5 @@ 'SSHAuthorizedKeysObject', 'feed_meta_generator', 'update_objects', 'EMailObject', 'URLObject', 'PEObject', 'PESectionObject', 'ELFObject', 'ELFSectionObject', 'MachOObject', 'MachOSectionObject', - 'AttributeValidationTool' + 'AttributeValidationTool', 'validate_event' ] diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 480cb4b4b..a3fa0702a 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -2,10 +2,13 @@ import ipaddress import json +import logging import re from base64 import b64decode from datetime import datetime from dateutil.parser import parse +from pymisp import MISPAttribute, MISPEvent, MISPObject +from typing import Generator from urllib.parse import urlparse HASH_HEX_LENGTH = { @@ -119,6 +122,8 @@ ) WEAKNESS_RE = re.compile(r"^CWE-[0-9]+$", flags=re.IGNORECASE) +logger = logging.getLogger('pymisp') + class AttributeValidationTool: @classmethod @@ -662,3 +667,59 @@ def _validate_ip(cls, value): except ValueError: return 'IP address has an invalid format.' return True + + +def validate_event(event: dict | MISPEvent) -> MISPEvent: + """ + Validate event attributes and skip/remove any that don't validate. + Replicates MISP server-side validation behavior. + + :param event: MISPEvent object or dict representing an event + :return: MISPEvent with only valid attributes + """ + try: + if isinstance(event, dict): + event = _load_misp_event(event) + # Validation of Attributes + event.attributes = list(_validate_attributes(event.attributes)) + # Validation of Objects + for misp_object in event.objects: + misp_object.attributes = list(_validate_object_attributes(misp_object)) + except Exception as e: + logger.error(f'Failed to validate event: {e}') + return event + + +def _load_misp_event(event: dict) -> MISPEvent: + misp_event = MISPEvent() + misp_event.from_dict(**event) + return misp_event + + +def _message_logging(validated: str, attribute: MISPAttribute, misp_object: MISPObject | None = None) -> str: + message = f'Failed validation for {attribute.type} Attribute <{attribute.uuid}>' + if misp_object is not None: + message = f'{message} in {misp_object.name} Object <{misp_object.uuid}>' + return f'{message}:\n{attribute.value} - {validated}' + + +def _validate_attributes(attributes: list) -> Generator: + for attribute in attributes: + value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) + validated = AttributeValidationTool.validate(attribute.type, value) + if validated is not True: + logger.warning(_message_logging(validated, attribute)) + continue + attribute.value = value + yield attribute + + +def _validate_object_attributes(misp_object: MISPObject) -> Generator: + for attribute in misp_object.attributes: + value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) + validated = AttributeValidationTool.validate(attribute.type, value) + if validated is not True: + logger.warning(_message_logging(validated, attribute, misp_object)) + continue + attribute.value = value + yield attribute From 3f56f0eb550c078051bc6f8a22a3ea7ddff53cc6 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 20 Nov 2025 19:40:00 +0100 Subject: [PATCH 11/26] add: [tests] Testing the Attribute validation tool --- tests/test_attributevalidationtool.py | 288 ++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 tests/test_attributevalidationtool.py diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py new file mode 100644 index 000000000..937f91001 --- /dev/null +++ b/tests/test_attributevalidationtool.py @@ -0,0 +1,288 @@ +import unittest +from pymisp.tools import AttributeValidationTool, validate_event + +class TestAttributeValidationTool(unittest.TestCase): + + def _should_be_valid(self, type_, *values): + for value in values: + self.assertTrue(AttributeValidationTool.validate(type_, value)) + + def _should_be_invalid(self, type_, *values): + for value in values: + self.assertNotEqual( + AttributeValidationTool.validate(type_, value), True + ) + + def test_modify_before_validation_as(self): + self.assertEqual('123', AttributeValidationTool.modifyBeforeValidation('AS', 'AS123')) + self.assertEqual(65537, AttributeValidationTool.modifyBeforeValidation('AS', '1.1')) + + def test_modify_before_validation_boolean(self): + self.assertEqual(True, AttributeValidationTool.modifyBeforeValidation('boolean', 'True')) + self.assertEqual(False, AttributeValidationTool.modifyBeforeValidation('boolean', 'False')) + self.assertEqual(True, AttributeValidationTool.modifyBeforeValidation('boolean', 1)) + self.assertEqual(False, AttributeValidationTool.modifyBeforeValidation('boolean', 0)) + + def test_modify_before_validation_domain(self): + self.assertEqual('example.com', AttributeValidationTool.modifyBeforeValidation('domain', 'example.com')) + self.assertEqual('example.com', AttributeValidationTool.modifyBeforeValidation('domain', 'EXAMPLE.COM')) + self.assertEqual('example.com|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'example.com|127.0.0.1')) + self.assertEqual('example.com|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'EXAMPLE.COM|127.0.0.1')) + self.assertEqual('xn--hkyrky-ptac70bc.cz', AttributeValidationTool.modifyBeforeValidation('domain', 'háčkyčárky.cz')) + self.assertEqual('xn--hkyrky-ptac70bc.cz', AttributeValidationTool.modifyBeforeValidation('domain', 'HÁČKYČÁRKY.CZ')) + self.assertEqual('xn--hkyrky-ptac70bc.cz|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'háčkyčárky.cz|127.0.0.1')) + self.assertEqual('xn--hkyrky-ptac70bc.cz|127.0.0.1', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'HÁČKYČÁRKY.CZ|127.0.0.1')) + + def test_modify_before_validation_filename_hash(self): + self.assertEqual('CMD.EXE|0cc175b9c0f1b6a831c399e269772661', AttributeValidationTool.modifyBeforeValidation('filename|md5', 'CMD.EXE|0CC175B9C0F1B6A831C399E269772661')) + + def test_modify_before_validation_financial(self): + self.assertEqual('123456', AttributeValidationTool.modifyBeforeValidation('cc-number', '1234-56')) + self.assertEqual('123456', AttributeValidationTool.modifyBeforeValidation('bin', '1234 56')) + self.assertEqual('ABC12', AttributeValidationTool.modifyBeforeValidation('iban', 'abc-12')) + self.assertEqual('ABC12', AttributeValidationTool.modifyBeforeValidation('bic', 'abc 12')) + + def test_modify_before_validation_hostname(self): + self.assertEqual('example.com|80', AttributeValidationTool.modifyBeforeValidation('hostname|port', 'example.com:80')) + self.assertEqual('example.com|80', AttributeValidationTool.modifyBeforeValidation('hostname|port', 'EXAMPLE.COM:80')) + + def test_modify_before_validation_ip(self): + self.assertEqual('127.0.0.1', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1/32')) + self.assertEqual('127.0.0.1/31', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1/31')) + self.assertEqual('example.com|1234:fd2:5621:1:89::4500', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'example.com|1234:0fd2:5621:0001:0089:0000:0000:4500/128')) + self.assertEqual('1234:fd2:5621:1:89::4500|80', AttributeValidationTool.modifyBeforeValidation('ip-src|port', '1234:0fd2:5621:0001:0089:0000:0000:4500/128|80')) + self.assertEqual('1234:fd2:5621:1:89::4500/127|80', AttributeValidationTool.modifyBeforeValidation('ip-src|port', '1234:0fd2:5621:0001:0089:0000:0000:4500/127|80')) + self.assertEqual('127.0.0.1', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1')) + + def test_modify_before_validation_ipv6(self): + self.assertEqual('1234:fd2:5621:1:89::4500', AttributeValidationTool.modifyBeforeValidation('ip-src', '1234:0fd2:5621:0001:0089:0000:0000:4500')) + self.assertEqual('example.com|1234:fd2:5621:1:89::4500', AttributeValidationTool.modifyBeforeValidation('domain|ip', 'example.com|1234:0fd2:5621:0001:0089:0000:0000:4500')) + self.assertEqual('1234:fd2:5621:1:89::4500|80', AttributeValidationTool.modifyBeforeValidation('ip-src|port', '1234:0fd2:5621:0001:0089:0000:0000:4500|80')) + self.assertEqual('127.0.0.1', AttributeValidationTool.modifyBeforeValidation('ip-src', '127.0.0.1')) + + def test_modify_before_validation_hashes(self): + # Hashes should be lowercased + for type_ in ['md5', 'sha1', 'sha256', 'email', 'hostname']: + self.assertEqual('abc', AttributeValidationTool.modifyBeforeValidation(type_, 'ABC')) + self.assertEqual('abc', AttributeValidationTool.modifyBeforeValidation(type_, ' AbC ')) + + def test_modify_before_validation_mac(self): + self.assertEqual('aa:bb:cc:dd:ee:ff', AttributeValidationTool.modifyBeforeValidation('mac-address', 'AA-BB-CC-DD-EE-FF')) + self.assertEqual('aa:bb:cc:dd:ee:ff', AttributeValidationTool.modifyBeforeValidation('mac-address', 'aabbccddeeff')) + + def test_modify_before_validation_phone(self): + self.assertEqual('+123456', AttributeValidationTool.modifyBeforeValidation('phone-number', '00123456')) + self.assertEqual('+123456', AttributeValidationTool.modifyBeforeValidation('phone-number', '+1 (23) 456')) + self.assertEqual('+123456', AttributeValidationTool.modifyBeforeValidation('prtn', '00123456')) + + def test_modify_before_validation_uppercase(self): + # HTTP methods and hex should be uppercased + self.assertEqual('POST', AttributeValidationTool.modifyBeforeValidation('http-method', 'post')) + self.assertEqual('AABB', AttributeValidationTool.modifyBeforeValidation('hex', 'aabb')) + + def test_modify_before_validation_vulnerability(self): + self.assertEqual('CVE-2020-1234', AttributeValidationTool.modifyBeforeValidation('vulnerability', 'CVE-2020-1234')) + self.assertEqual('CVE-2020-1234', AttributeValidationTool.modifyBeforeValidation('vulnerability', 'cve-2020-1234')) + self.assertEqual('CVE-2020-1234', AttributeValidationTool.modifyBeforeValidation('vulnerability', 'CVE–2020–1234')) # en-dash + + def test_modify_before_validation_weakness(self): + self.assertEqual('CWE-89', AttributeValidationTool.modifyBeforeValidation('weakness', 'cwe-89')) + self.assertEqual('CWE-89', AttributeValidationTool.modifyBeforeValidation('weakness', 'CWE–89')) + + def test_modify_before_validation_x509(self): + self.assertEqual('aa6fc83f37787abea6be2c5126163fd3', AttributeValidationTool.modifyBeforeValidation('x509-fingerprint-md5', 'AA:6F:C8:3F:37:78:7A:BE:A6:BE:2C:51:26:16:3F:D3')) + + def test_validate_as(self): + self._should_be_valid('AS', '0', 0, 1, '1', 4294967295, '1.1') + self._should_be_invalid('AS', '1.2.3.4') + + def test_validate_domain_ip(self): + self._should_be_valid( + 'domain|ip', 'example.com|127.0.0.1', 'example.com|::1' + ) + self._should_be_invalid( + 'domain|ip', 'example.com|127', 'example.com|1', + ) + + def test_validate_filename(self): + self._should_be_valid('filename', 'cmd.exe', 'cmd.com') + self._should_be_invalid('filename', 'cmd.exe\ncmd.com') + self._should_be_valid( + 'filename|md5', 'cmd.exe|0cc175b9c0f1b6a831c399e269772661', + 'cmd.com|0cc175b9c0f1b6a831c399e269772661' + ) + self._should_be_invalid('filename|md5', 'cmd.exe\ncmd.com|0cc175b9c0f1b6a831c399e269772661') + + def test_validate_hashes(self): + self._should_be_valid('filename|md5', 'cmd.exe|0cc175b9c0f1b6a831c399e269772661') + self._should_be_invalid('filename|md5', 'cmd.exe|86f7e437faa5a7fce15d1ddcb9eaeaea377667b8') + self._should_be_valid( + 'tlsh', + 'b2317c38fac0333c8ff7d3ff31fcf3b7fb3f9a3ef3bf3c880cfc43ebf97f3cc73fbfc', + 't1fdd4e000b6a1c034f1f612f849b6a3a4b53f7ea1677481cf12d916ea4a79af1ed31317' + ) + self._should_be_valid( + 'filename|tlsh', + 'cmd.exe|b2317c38fac0333c8ff7d3ff31fcf3b7fb3f9a3ef3bf3c880cfc43ebf97f3cc73fbfc', + 'cmd.exe|t1fdd4e000b6a1c034f1f612f849b6a3a4b53f7ea1677481cf12d916ea4a79af1ed31317' + ) + self._should_be_valid( + 'ssdeep', + '96:s4Ud1Lj96tHHlZDrwciQmA+4uy1I0G4HYuL8N3TzS8QsO/wqWXLcMSx:sF1LjEtHHlZDrJzrhuyZvHYm8tKp/RWO', + '384:EWo4X1WaPW9ZWhWzLo+lWpct/fWbkWsWIwW0/S7dZhgG8:EWo4X1WmW9ZWhWH/WpchfWgWsWTWtf8', + '6144:3wSQSlrBHFjOvwYAU/Fsgi/2WDg5+YaNk5xcHrYw+Zg+XrZsGEREYRGAFU25ttR/:ctM7E0L4q' + ) + self._should_be_valid( + 'filename|ssdeep', + 'ahoj.txt|96:s4Ud1Lj96tHHlZDrwciQmA+4uy1I0G4HYuL8N3TzS8QsO/wqWXLcMSx:sF1LjEtHHlZDrJzrhuyZvHYm8tKp/RWO' + ) + self._should_be_valid('dom-hash', '0cc175b9c0f1b6a831c399e269772661') + + self._should_be_valid('telfhash', 'a' * 70, 'a' * 72) + self._should_be_invalid('telfhash', 'a' * 69, 'z' * 70) # z is not hex + + self._should_be_valid('pehash', 'a' * 40) + self._should_be_invalid('pehash', 'a' * 39, 'z' * 40) + + self._should_be_valid('impfuzzy', '3:aabbcc:ddeeff') + self._should_be_invalid('impfuzzy', '3:aabbcc', 'x:aabbcc:ddeeff') + + self._should_be_valid('cdhash', 'a' * 40) + self._should_be_invalid('cdhash', 'a' * 39, 'z' * 40) + + self._should_be_valid('filename|vhash', 'file.txt|vhash123') + self._should_be_invalid('filename|vhash', 'file.txt') + + def test_validate_identifiers(self): + self._should_be_valid('vulnerability', 'CVE-2020-1234', 'GHSA-1234-1234-1234') + self._should_be_invalid('vulnerability', 'CVE-2020', 'invalid') + + self._should_be_valid('weakness', 'CWE-89') + self._should_be_invalid('weakness', 'CWE-ABC', 'invalid') + + self._should_be_valid('uuid', '123e4567-e89b-12d3-a456-426614174000') + self._should_be_invalid('uuid', '123e4567-e89b-12d3-a456-42661417400g', '123e4567-e89b-12d3-a456-42661417400') + + self._should_be_valid('target-user', 'user1') + self._should_be_invalid('target-user', 'user1\nuser2') + + def test_validate_ip(self): + for type_ in ['ip-src', 'ip-dst']: + self._should_be_valid( + type_, '127.0.0.1', '127.0.0.1/32', '::1', '::1/128' + ) + self._should_be_invalid( + type_, '127','127.0.0.', '127.0.0.1/', '127.0.0.1/32/1', + '127.0.0.1/128', '::1/257', '::1/257', '::1/128/1' + ) + + def test_validate_misc(self): + self._should_be_valid('windows-service-name', 'service1') + self._should_be_invalid('windows-service-name', 'service/1', 'service\\1', 'a' * 257) + + self._should_be_valid('link', 'http://example.com', 'ftp://example.com') + self._should_be_invalid('link', 'example.com') + + self._should_be_valid('hex', 'aabbcc') + self._should_be_invalid('hex', 'zz') + + self._should_be_valid('datetime', '2020-01-01T00:00:00') + self._should_be_invalid('datetime', 'invalid') + + self._should_be_valid('size-in-bytes', '1024', 1024) + self._should_be_invalid('size-in-bytes', '-1', 'abc') + self._should_be_valid('integer', '123', '-123') + self._should_be_invalid('integer', 'abc') + self._should_be_valid('float', '1.23', '-1.23') + self._should_be_invalid('float', 'abc') + + self._should_be_valid('iban', 'ALPHANUM123') + self._should_be_invalid('iban', 'invalid-char!') + self._should_be_valid('btc', '1BvBMSEYstWetqTFn5Au4m4GFg7xJaNVN2') + self._should_be_invalid('btc', 'invalid!') + + self._should_be_valid('cortex', '{"a": 1}') + self._should_be_invalid('cortex', '{a: 1}') + + self._should_be_valid('boolean', True, False) + self._should_be_invalid('boolean', 'maybe') + + def test_validate_networking(self): + self._should_be_valid('ip-dst|port', '127.0.0.1|80', '::1|80') + self._should_be_invalid('ip-dst|port', '127.0.0.1', '127.0.0.1|99999') + + self._should_be_valid('onion-address', 'abcdefghijklmnop.onion', 'abcdefghijklmnopqrstuvwxyz234567abcdefghijklmnopqrstuvwxyz23.onion') + self._should_be_invalid('onion-address', 'invalid.onion', 'abc.onion') + + self._should_be_valid('mac-address', 'aa:bb:cc:dd:ee:ff') + self._should_be_invalid('mac-address', 'aa:bb:cc:dd:ee:gg', 'aabbccddeeff') + + self._should_be_valid('mac-eui-64', 'aa:bb:cc:ff:fe:dd:ee:11') + self._should_be_invalid('mac-eui-64', 'aa:bb:cc:dd:ee:ff:00:11', 'aa:bb:cc:dd:ee:ff:00:gg') + + self._should_be_valid('hostname|port', 'example.com|80') + self._should_be_invalid('hostname|port', 'example.com', 'example.com|99999', 'invalid_domain|80') + + self._should_be_valid('email', 'test@example.com', 'a.b@c.d') + self._should_be_invalid('email', 'test@example', 'test.com') + + self._should_be_valid('http-method', 'GET', 'POST') + self._should_be_invalid('http-method', 'get', 'FIND') + + def test_validate_port(self): + self.assertTrue(AttributeValidationTool.validate('port', '1')) + self.assertTrue(AttributeValidationTool.validate('port', 1)) + self.assertTrue(AttributeValidationTool.validate('port', 80)) + self.assertNotEqual(AttributeValidationTool.validate('port', -80), True) + self.assertNotEqual(AttributeValidationTool.validate('port', '-80'), True) + + def test_validate_ssdeep(self): + self._should_be_valid('ssdeep', "768:+OFu8Q3w6QzfR5Jni6SQD7qSFDs6P93/q0XIc/UB5EPABWX:RFu8QAFzffJui79f13/AnB5EPAkX") + self._should_be_invalid('ssdeep', "768:+OFu8Q3w6QzfR5Jni6SQD7qSFDs6P93/q0XIc/UB5EPABWX\n\n:RFu8QAFzffJui79f13/AnB5EPAkX") + + def test_validate_ssh_fingerprint(self): + self._should_be_valid( + 'ssh-fingerprint', + '7b:e5:6f:a7:f4:f9:81:62:5c:e3:1f:bf:8b:57:6c:5a', + 'MD5:7b:e5:6f:a7:f4:f9:81:62:5c:e3:1f:bf:8b:57:6c:5a', + 'SHA256:mVPwvezndPv/ARoIadVY98vAC0g+P/5633yTC4d/wXE', + ) + + def test_validate_event(self): + event_dict = { + 'Event': { + 'info': 'Test Event', + 'Attribute': [ + {'type': 'ip-src', 'value': '1.1.1.1'}, # Valid + {'type': 'ip-src', 'value': '999.999.999.999'}, # Invalid + {'type': 'domain', 'value': 'google.com'}, # Valid + {'type': 'md5', 'value': 'invalid_md5'}, # Invalid + {'type': 'AS', 'value': '1.1'} # modified and valid + ], + 'Object': [ + { + 'name': 'file', + 'Attribute': [ + {'type': 'filename', 'object_relation': 'filename', 'value': 'test.txt'}, # Valid + {'type': 'md5', 'object_relation': 'md5', 'value': '0cc175b9c0f1b6a831c399e269772661'}, # Valid + {'type': 'md5', 'object_relation': 'md5', 'value': 'invalid_md5'}, # Invalid + ] + } + ] + } + } + + # Run validation + validated_event = validate_event(event_dict) + + # Check Attributes + self.assertEqual(len(validated_event.attributes), 3) + self.assertEqual(validated_event.attributes[0].value, '1.1.1.1') + self.assertEqual(validated_event.attributes[1].value, 'google.com') + self.assertEqual(validated_event.attributes[2].value, 65537) + + # Check Objects + self.assertEqual(len(validated_event.objects), 1) + self.assertEqual(len(validated_event.objects[0].attributes), 2) + self.assertEqual(validated_event.objects[0].attributes[0].value, 'test.txt') + self.assertEqual(validated_event.objects[0].attributes[1].value, '0cc175b9c0f1b6a831c399e269772661') From 56807b68475ab4902eda887fea37274604c8ec4b Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 20 Nov 2025 20:09:48 +0100 Subject: [PATCH 12/26] fix: [tools] Trying to make nosetests happy (?) --- pymisp/tools/attributevalidationtool.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index a3fa0702a..710e35bb5 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -633,8 +633,8 @@ def _normalise_ip(value): @classmethod def _refang_value(cls, attribute_type, value): for rule in REFANG_REGEX_TABLE: - if attribute_type in rule['types']: - value = rule['from'].sub(rule['to'], value) + if attribute_type in rule['types']: # type: ignore + value = rule['from'].sub(rule['to'], value) # type: ignore return cls._handle_4byte_unicode(value) @classmethod @@ -669,7 +669,7 @@ def _validate_ip(cls, value): return True -def validate_event(event: dict | MISPEvent) -> MISPEvent: +def validate_event(event: dict | MISPEvent) -> MISPEvent: # type: ignore """ Validate event attributes and skip/remove any that don't validate. Replicates MISP server-side validation behavior. @@ -690,7 +690,7 @@ def validate_event(event: dict | MISPEvent) -> MISPEvent: return event -def _load_misp_event(event: dict) -> MISPEvent: +def _load_misp_event(event: dict) -> MISPEvent: # type: ignore misp_event = MISPEvent() misp_event.from_dict(**event) return misp_event @@ -703,7 +703,7 @@ def _message_logging(validated: str, attribute: MISPAttribute, misp_object: MISP return f'{message}:\n{attribute.value} - {validated}' -def _validate_attributes(attributes: list) -> Generator: +def _validate_attributes(attributes: list) -> Generator: # type: ignore for attribute in attributes: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) @@ -714,7 +714,7 @@ def _validate_attributes(attributes: list) -> Generator: yield attribute -def _validate_object_attributes(misp_object: MISPObject) -> Generator: +def _validate_object_attributes(misp_object: MISPObject) -> Generator: # type: ignore for attribute in misp_object.attributes: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) From 7a2dc7d984eed324eec08bc9a9a685c29eeb4542 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Fri, 21 Nov 2025 10:19:37 +0100 Subject: [PATCH 13/26] fix: [tools] Removed `;` that were copy pasted from php by mistake --- pymisp/tools/attributevalidationtool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 710e35bb5..15823e794 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -403,7 +403,7 @@ def validate(cls, attribute_type, value): return ( f'{attribute_type.capitalize()} has an invalid format. ' 'Please double check the value or select type "other".' - ); + ) case 'hostname|port': composite = value.split('|') if len(composite) != 2: @@ -430,7 +430,7 @@ def validate(cls, attribute_type, value): return ( 'Email address has an invalid format. Please double ' 'check the value or select type "other".' - ); + ) case 'vulnerability': if VULNERABILITY_RE.fullmatch(value): return True From 82e43551a29e798fa3524ed3281d14ebd3eb77e4 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Fri, 21 Nov 2025 17:47:01 +0100 Subject: [PATCH 14/26] fix: [tools] making codefactor happy by removing an `except .. pass` --- pymisp/tools/attributevalidationtool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 15823e794..01b56f2f0 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -235,7 +235,7 @@ def modifyBeforeValidation(cls, attribute_type, value): try: return parse(value) except Exception: - pass + return value return value case 'AS': if value.upper().startswith('AS'): From 480eea737cfc5e603dc782b3299410f49385d70b Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Mon, 24 Nov 2025 18:10:20 +0100 Subject: [PATCH 15/26] fix: [tools] Populating validation error messages to make them available outside of the single cli scope --- pymisp/tools/attributevalidationtool.py | 30 ++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 01b56f2f0..c991fc51f 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -669,7 +669,8 @@ def _validate_ip(cls, value): return True -def validate_event(event: dict | MISPEvent) -> MISPEvent: # type: ignore +def validate_event(event: dict | MISPEvent, + errors: dict[str, list[str]]) -> MISPEvent: # type: ignore """ Validate event attributes and skip/remove any that don't validate. Replicates MISP server-side validation behavior. @@ -681,12 +682,14 @@ def validate_event(event: dict | MISPEvent) -> MISPEvent: # type: ignore if isinstance(event, dict): event = _load_misp_event(event) # Validation of Attributes - event.attributes = list(_validate_attributes(event.attributes)) + event.attributes = list(_validate_attributes(event.attributes, errors)) # Validation of Objects for misp_object in event.objects: - misp_object.attributes = list(_validate_object_attributes(misp_object)) + misp_object.attributes = list(_validate_object_attributes(misp_object, errors)) except Exception as e: - logger.error(f'Failed to validate event: {e}') + message = f'Failed to validate event: {e}' + logger.error(message) + _populate_error_message(errors, 'errors', message) return event @@ -703,23 +706,34 @@ def _message_logging(validated: str, attribute: MISPAttribute, misp_object: MISP return f'{message}:\n{attribute.value} - {validated}' -def _validate_attributes(attributes: list) -> Generator: # type: ignore +def _populate_error_message(errors: dict[str, list[str]], key: str, message: str) -> None: + try: + errors[key].append(message) + except KeyError: + errors[key] = [message] + + +def _validate_attributes(attributes: list, errors: dict[str, list[str]]) -> Generator: # type: ignore for attribute in attributes: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) if validated is not True: - logger.warning(_message_logging(validated, attribute)) + message = _message_logging(validated, attribute) + logger.warning(message) + _populate_error_message(errors, 'warnings', message) continue attribute.value = value yield attribute -def _validate_object_attributes(misp_object: MISPObject) -> Generator: # type: ignore +def _validate_object_attributes(misp_object: MISPObject, errors: dict[str, list[str]]) -> Generator: # type: ignore for attribute in misp_object.attributes: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) if validated is not True: - logger.warning(_message_logging(validated, attribute, misp_object)) + message = _message_logging(validated, attribute, misp_object) + logger.warning(message) + _populate_error_message(errors, 'warnings', message) continue attribute.value = value yield attribute From 87798724881234eba64288222a726fd6927294f7 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 25 Nov 2025 12:16:10 +0100 Subject: [PATCH 16/26] chg: [tests] Checking that validation error messages are populated as expected --- tests/test_attributevalidationtool.py | 29 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py index 937f91001..1913e07fa 100644 --- a/tests/test_attributevalidationtool.py +++ b/tests/test_attributevalidationtool.py @@ -1,4 +1,5 @@ import unittest +from collections import defaultdict from pymisp.tools import AttributeValidationTool, validate_event class TestAttributeValidationTool(unittest.TestCase): @@ -273,16 +274,30 @@ def test_validate_event(self): } # Run validation - validated_event = validate_event(event_dict) + validated_event = validate_event(event_dict, errors := defaultdict(list)) # Check Attributes self.assertEqual(len(validated_event.attributes), 3) - self.assertEqual(validated_event.attributes[0].value, '1.1.1.1') - self.assertEqual(validated_event.attributes[1].value, 'google.com') - self.assertEqual(validated_event.attributes[2].value, 65537) + ip_attribute, domain_attribute, as_attribute = validated_event.attributes + self.assertEqual(ip_attribute.value, '1.1.1.1') + self.assertEqual(domain_attribute.value, 'google.com') + self.assertEqual(as_attribute.value, 65537) # Check Objects self.assertEqual(len(validated_event.objects), 1) - self.assertEqual(len(validated_event.objects[0].attributes), 2) - self.assertEqual(validated_event.objects[0].attributes[0].value, 'test.txt') - self.assertEqual(validated_event.objects[0].attributes[1].value, '0cc175b9c0f1b6a831c399e269772661') + file_object = validated_event.objects[0] + self.assertEqual(file_object.name, 'file') + self.assertEqual(len(file_object.attributes), 2) + filename_attribute, md5_attribute = file_object.attributes + self.assertEqual(filename_attribute.value, 'test.txt') + self.assertEqual(md5_attribute.value, '0cc175b9c0f1b6a831c399e269772661') + + # Check Errors + self.assertEqual(len(errors['warnings']), 3) + ip_error, *md5_errors = errors['warnings'] + self.assertIn('IP address has an invalid format.', ip_error) + for md5_error in md5_errors: + self.assertIn( + 'Checksum has an invalid length or format (expected: 32 hexadecimal characters).', + md5_error + ) From 705457457f6883aed2af49878e47033c5870be71 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 25 Nov 2025 13:14:03 +0100 Subject: [PATCH 17/26] fix: [tools] Avoiding typing warnings --- tests/test_attributevalidationtool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py index 1913e07fa..77f054d32 100644 --- a/tests/test_attributevalidationtool.py +++ b/tests/test_attributevalidationtool.py @@ -274,7 +274,7 @@ def test_validate_event(self): } # Run validation - validated_event = validate_event(event_dict, errors := defaultdict(list)) + validated_event = validate_event(event_dict, errors := defaultdict(list)) # type: ignore # Check Attributes self.assertEqual(len(validated_event.attributes), 3) From bece4fda416387c86877205731fb829ce311b5d1 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 25 Nov 2025 13:22:13 +0100 Subject: [PATCH 18/26] fix: [tools] Making nosetests happy with typings... --- pymisp/tools/attributevalidationtool.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index c991fc51f..882785a21 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -669,8 +669,7 @@ def _validate_ip(cls, value): return True -def validate_event(event: dict | MISPEvent, - errors: dict[str, list[str]]) -> MISPEvent: # type: ignore +def validate_event(event: dict | MISPEvent, errors: dict[str, list[str]]) -> MISPEvent: # type: ignore """ Validate event attributes and skip/remove any that don't validate. Replicates MISP server-side validation behavior. From 1840621f4985588e7797c16aa46bb0791d511182 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 26 Nov 2025 18:29:21 +0100 Subject: [PATCH 19/26] fix: [tools] Avoiding validation to fail on `datetime` attributes being already a datetime object --- pymisp/tools/attributevalidationtool.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 882785a21..7569a5d8b 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -496,6 +496,8 @@ def validate(cls, attribute_type, value): return True return 'SSH fingerprint must be in MD5 or SHA256 format.' case 'datetime': + if isinstance(value, datetime): + return True try: parse(value) return True From 63cfe001377668c9f41a6b73b5720004b1b8036e Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 26 Nov 2025 18:31:30 +0100 Subject: [PATCH 20/26] add: [tools] Added methods to validate attribute(s) or object(s) individually --- pymisp/tools/__init__.py | 8 +- pymisp/tools/attributevalidationtool.py | 161 +++++++++++++++++++----- 2 files changed, 137 insertions(+), 32 deletions(-) diff --git a/pymisp/tools/__init__.py b/pymisp/tools/__init__.py index 54b13e65c..5e41f27b1 100644 --- a/pymisp/tools/__init__.py +++ b/pymisp/tools/__init__.py @@ -13,7 +13,9 @@ from .asnobject import ASNObject # noqa from .geolocationobject import GeolocationObject # noqa from .git_vuln_finder_object import GitVulnFinderObject # noqa -from .attributevalidationtool import AttributeValidationTool, validate_event # noqa +from .attributevalidationtool import ( #noqa + AttributeValidationTool, validate_attribute, validate_attributes, + validate_event, validate_object, validate_objects, ValidationError) from .vehicleobject import VehicleObject # noqa from .csvloader import CSVLoader # noqa @@ -53,5 +55,7 @@ 'SSHAuthorizedKeysObject', 'feed_meta_generator', 'update_objects', 'EMailObject', 'URLObject', 'PEObject', 'PESectionObject', 'ELFObject', 'ELFSectionObject', 'MachOObject', 'MachOSectionObject', - 'AttributeValidationTool', 'validate_event' + 'AttributeValidationTool', 'validate_attribute', 'validate_attributes', + 'validate_event', 'validate_object', 'validate_objects', + 'ValidationError' ] diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 7569a5d8b..79bd59037 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -8,6 +8,7 @@ from datetime import datetime from dateutil.parser import parse from pymisp import MISPAttribute, MISPEvent, MISPObject +from pymisp.exceptions import PyMISPError from typing import Generator from urllib.parse import urlparse @@ -125,6 +126,10 @@ logger = logging.getLogger('pymisp') +class ValidationError(PyMISPError): + pass + + class AttributeValidationTool: @classmethod def modifyBeforeValidation(cls, attribute_type, value): @@ -671,35 +676,138 @@ def _validate_ip(cls, value): return True -def validate_event(event: dict | MISPEvent, errors: dict[str, list[str]]) -> MISPEvent: # type: ignore +def validate_attribute(attribute: dict | MISPAttribute) -> MISPAttribute: # type: ignore """ - Validate event attributes and skip/remove any that don't validate. - Replicates MISP server-side validation behavior. + Validates a MISP Attribute and returns a MISPAttribute if valid. + Replicates MISP server-side validation behavior on Attributes. - :param event: MISPEvent object or dict representing an event - :return: MISPEvent with only valid attributes + :param attribute: dict or MISPAttribute to validate + :return: Validated MISPAttribute object + :raises PyMISPError: If the attribute cannot be loaded or a validation error occurs + :raises ValidationError: If the attribute is invalid """ + if isinstance(attribute, dict): + try: + attribute = _load_misp_attribute(attribute) + except Exception as e: + message = f'Error loading Attribute: {e}' + logger.error(message) + raise PyMISPError(message) try: - if isinstance(event, dict): - event = _load_misp_event(event) - # Validation of Attributes - event.attributes = list(_validate_attributes(event.attributes, errors)) - # Validation of Objects - for misp_object in event.objects: - misp_object.attributes = list(_validate_object_attributes(misp_object, errors)) + value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) + validated = AttributeValidationTool.validate(attribute.type, value) except Exception as e: - message = f'Failed to validate event: {e}' + message = f'Error validating Attribute <{attribute.uuid}>: {e}' logger.error(message) - _populate_error_message(errors, 'errors', message) + raise PyMISPError(message) + if validated is not True: + message = _message_logging(validated, attribute) + logger.warning(message) + raise ValidationError(message) + attribute.value = value + return attribute + + +def validate_attributes(attributes: list, errors: dict) -> Generator: # type: ignore + """ + Validates a list of MISP attributes and skips any that doesn't validate. + + :param attributes: List of MISPAttribute objects + :param errors: Dictionary to populate with any validation error messages + :return: Generator yielding only valid MISPAttribute objects + """ + for attribute in attributes: + try: + misp_attribute = validate_attribute(attribute) + except ValidationError as e: + _populate_error_message(errors, 'warnings', str(e)) + continue + except PyMISPError as e: + _populate_error_message(errors, 'errors', str(e)) + continue + yield misp_attribute + + +def validate_event(event: dict | MISPEvent, errors: dict) -> MISPEvent: # type: ignore + """ + Validates an event and skips Attributes or Object Attributes that don't validate. + + :param event: MISPEvent object or dict representing an event + :param errors: Dictionary to populate with any validation error messages + :return: MISPEvent with only valid attributes + :raises PyMISPError: If the event cannot be loaded + """ + if isinstance(event, dict): + try: + event = _load_misp_event(event) + except Exception as e: + message = f'Error loading Event: {e}' + logger.error(message) + raise PyMISPError(message) + # Validation of Attributes + event.attributes = list(validate_attributes(event.attributes, errors)) + # Validation of Objects + event.objects = list(validate_objects(event.objects, errors)) return event +def validate_object(misp_object: dict | MISPObject, errors: dict) -> MISPObject: # type: ignore + """ + Validates an object and skips any Object Attribute that doesn't validate. + + :param misp_object: MISPObject object or dict representing an object + :param errors: Dictionary to populate with any validation error messages + :return: MISPObject with only valid attributes + :raises PyMISPError: If the object cannot be loaded + """ + if isinstance(misp_object, dict): + try: + misp_object = _load_misp_object(misp_object) + except Exception as e: + message = f'Error loading Object: {e}' + logger.error(message) + raise PyMISPError(message) + # Validation of Object Attributes + misp_object.attributes = list(_validate_object_attributes(misp_object, errors)) + return misp_object + + +def validate_objects(misp_objects: list, errors: dict) -> Generator: # type: ignore + """ + Validates a list of MISP objects and skips any Object Attribute that + doesn't validate. + + :param misp_objects: List of MISPObject objects + :param errors: Dictionary to populate with any validation error messages + :return: Generator yielding only valid MISPObject objects + """ + for mispObject in misp_objects: + try: + misp_object = validate_object(mispObject, errors) + except PyMISPError as e: + _populate_error_message(errors, 'errors', str(e)) + continue + yield misp_object + + +def _load_misp_attribute(attribute: dict) -> MISPAttribute: # type: ignore + misp_attribute = MISPAttribute() + misp_attribute.from_dict(**attribute) + return misp_attribute + + def _load_misp_event(event: dict) -> MISPEvent: # type: ignore misp_event = MISPEvent() misp_event.from_dict(**event) return misp_event +def _load_misp_object(mispObject: dict) -> MISPObject: # type: ignore + misp_object = MISPObject(mispObject['name']) + misp_object.from_dict(**mispObject) + return misp_object + + def _message_logging(validated: str, attribute: MISPAttribute, misp_object: MISPObject | None = None) -> str: message = f'Failed validation for {attribute.type} Attribute <{attribute.uuid}>' if misp_object is not None: @@ -714,23 +822,16 @@ def _populate_error_message(errors: dict[str, list[str]], key: str, message: str errors[key] = [message] -def _validate_attributes(attributes: list, errors: dict[str, list[str]]) -> Generator: # type: ignore - for attribute in attributes: - value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) - validated = AttributeValidationTool.validate(attribute.type, value) - if validated is not True: - message = _message_logging(validated, attribute) - logger.warning(message) - _populate_error_message(errors, 'warnings', message) - continue - attribute.value = value - yield attribute - - -def _validate_object_attributes(misp_object: MISPObject, errors: dict[str, list[str]]) -> Generator: # type: ignore +def _validate_object_attributes(misp_object: MISPObject, errors: dict) -> Generator: # type: ignore for attribute in misp_object.attributes: - value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) - validated = AttributeValidationTool.validate(attribute.type, value) + try: + value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) + validated = AttributeValidationTool.validate(attribute.type, value) + except Exception as e: + message = f'Error validating Object Attribute <{attribute.uuid}> in Object <{misp_object.uuid}>: {e}' + logger.error(message) + _populate_error_message(errors, 'errors', message) + continue if validated is not True: message = _message_logging(validated, attribute, misp_object) logger.warning(message) From 598f0d04aa9f20effc8123afdccbcaaa4d9d02f4 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 26 Nov 2025 20:10:55 +0100 Subject: [PATCH 21/26] add: [tests] Tests to check the latest methods added to validate Attributes and Objects --- tests/test_attributevalidationtool.py | 117 +++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 2 deletions(-) diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py index 77f054d32..b8d28ee69 100644 --- a/tests/test_attributevalidationtool.py +++ b/tests/test_attributevalidationtool.py @@ -1,6 +1,11 @@ import unittest from collections import defaultdict -from pymisp.tools import AttributeValidationTool, validate_event +from datetime import datetime +from pymisp import MISPAttribute, MISPObject +from pymisp.tools import ( + AttributeValidationTool, validate_attribute, validate_attributes, + validate_event, validate_object, validate_objects, ValidationError +) class TestAttributeValidationTool(unittest.TestCase): @@ -188,7 +193,7 @@ def test_validate_misc(self): self._should_be_invalid('hex', 'zz') self._should_be_valid('datetime', '2020-01-01T00:00:00') - self._should_be_invalid('datetime', 'invalid') + self._should_be_invalid('datetime', '2020:01:01 00-00-00') self._should_be_valid('size-in-bytes', '1024', 1024) self._should_be_invalid('size-in-bytes', '-1', 'abc') @@ -301,3 +306,111 @@ def test_validate_event(self): 'Checksum has an invalid length or format (expected: 32 hexadecimal characters).', md5_error ) + + def test_validate_attribute(self): + # Test with valid dict + attribute_dict = {'type': 'ip-src', 'value': '1.1.1.1'} + validated = validate_attribute(attribute_dict) + self.assertIsInstance(validated, MISPAttribute) + self.assertEqual(validated.value, '1.1.1.1') + + # Test with valid MISPAttribute + attribute = MISPAttribute() + attribute.from_dict(**attribute_dict) + validated = validate_attribute(attribute) + self.assertIsInstance(validated, MISPAttribute) + self.assertEqual(validated.value, '1.1.1.1') + + # Test with invalid dict + invalid_dict = {'type': 'ip-src', 'value': '999.999.999.999'} + with self.assertRaises(ValidationError) as cm: + validate_attribute(invalid_dict) + self.assertIn('IP address has an invalid format.', str(cm.exception)) + + # Test with invalid MISPAttribute + invalid_attribute = MISPAttribute() + invalid_attribute.from_dict(**invalid_dict) + with self.assertRaises(ValidationError) as cm: + validate_attribute(invalid_attribute) + self.assertIn('IP address has an invalid format.', str(cm.exception)) + + # Test modification + modified_dict = {'type': 'AS', 'value': 'AS123'} + validated = validate_attribute(modified_dict) + self.assertEqual(validated.value, '123') + + def test_validate_attributes(self): + attributes = [ + {'type': 'ip-src', 'value': '1.1.1.1'}, # Valid + {'type': 'ip-src', 'value': '999.999.999.999'}, # Invalid + {'type': 'domain', 'value': 'google.com'} # Valid + ] + + valid_attributes = list(validate_attributes(attributes, errors := defaultdict(list))) + + self.assertEqual(len(valid_attributes), 2) + self.assertEqual(valid_attributes[0].value, '1.1.1.1') + self.assertEqual(valid_attributes[1].value, 'google.com') + + self.assertEqual(len(errors['warnings']), 1) + self.assertIn('IP address has an invalid format.', errors['warnings'][0]) + + def test_validate_object(self): + object_dict = { + 'name': 'file', + 'Attribute': [ + {'type': 'filename', 'object_relation': 'filename', 'value': 'test.txt'}, # Valid + {'type': 'md5', 'object_relation': 'md5', 'value': 'invalid_md5'} # Invalid + ] + } + + # Test with dict + validated_object = validate_object(object_dict, errors := {}) + self.assertIsInstance(validated_object, MISPObject) + self.assertEqual(len(validated_object.attributes), 1) + self.assertEqual(validated_object.attributes[0].value, 'test.txt') + self.assertEqual(len(errors['warnings']), 1) + self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) + + # Test with MISPObject + misp_object = MISPObject('file') + misp_object.from_dict(**object_dict) + validated_object = validate_object(misp_object, errors := {}) + self.assertEqual(len(validated_object.attributes), 1) + self.assertEqual(validated_object.attributes[0].value, 'test.txt') + self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) + + def test_validate_objects(self): + objects = [ + { + 'name': 'file', + 'Attribute': [ + {'type': 'filename', 'object_relation': 'filename', 'value': 'test.txt'}, + {'type': 'md5', 'object_relation': 'md5', 'value': 'invalid_md5'} + ] + }, + { + 'name': 'x509', + 'Attribute': [ + {'type': 'x509-fingerprint-md5', 'object_relation': 'x509-fingerprint-md5', 'value': 'b2a5abfeef9e36964281a31e17b57c97'}, + {'type': 'datetime', 'object_relation': 'validity-not-before', 'value': '2022-01-01T00:00:00'} + ] + } + ] + + errors = defaultdict(list) + valid_objects = list(validate_objects(objects, errors)) + + self.assertEqual(len(valid_objects), 2) + file_object, x509_object = valid_objects + # First object should have 1 attribute (1 filtered out) + self.assertEqual(len(file_object.attributes), 1) + self.assertEqual(file_object.attributes[0].value, 'test.txt') + self.assertEqual(len(errors['warnings']), 1) + self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) + + # Second object should have 2 attributes + self.assertEqual(len(x509_object.attributes), 2) + self.assertEqual(x509_object.attributes[0].value, 'b2a5abfeef9e36964281a31e17b57c97') + validity = x509_object.attributes[1].value + self.assertEqual(x509_object.attributes[1].value, datetime(2022, 1, 1, 0, 0, 0)) From ebd655f4705f7d25518830d5ff69d77654abe390 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 27 Nov 2025 16:17:38 +0100 Subject: [PATCH 22/26] fix: [tests] Typings again... --- tests/test_attributevalidationtool.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py index b8d28ee69..7511b3aa6 100644 --- a/tests/test_attributevalidationtool.py +++ b/tests/test_attributevalidationtool.py @@ -346,7 +346,7 @@ def test_validate_attributes(self): {'type': 'domain', 'value': 'google.com'} # Valid ] - valid_attributes = list(validate_attributes(attributes, errors := defaultdict(list))) + valid_attributes = list(validate_attributes(attributes, errors := defaultdict(list))) # type: ignore self.assertEqual(len(valid_attributes), 2) self.assertEqual(valid_attributes[0].value, '1.1.1.1') @@ -365,7 +365,7 @@ def test_validate_object(self): } # Test with dict - validated_object = validate_object(object_dict, errors := {}) + validated_object = validate_object(object_dict, errors := {}) # type: ignore self.assertIsInstance(validated_object, MISPObject) self.assertEqual(len(validated_object.attributes), 1) self.assertEqual(validated_object.attributes[0].value, 'test.txt') @@ -375,7 +375,7 @@ def test_validate_object(self): # Test with MISPObject misp_object = MISPObject('file') misp_object.from_dict(**object_dict) - validated_object = validate_object(misp_object, errors := {}) + validated_object = validate_object(misp_object, errors := {}) # type: ignore self.assertEqual(len(validated_object.attributes), 1) self.assertEqual(validated_object.attributes[0].value, 'test.txt') self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) @@ -398,8 +398,7 @@ def test_validate_objects(self): } ] - errors = defaultdict(list) - valid_objects = list(validate_objects(objects, errors)) + valid_objects = list(validate_objects(objects, errors := defaultdict(list))) # type: ignore self.assertEqual(len(valid_objects), 2) file_object, x509_object = valid_objects From bb4741073e086c64d217a08395bcc5f1461cc826 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 27 Nov 2025 16:26:22 +0100 Subject: [PATCH 23/26] fix: [tests] Making nosetests happy --- tests/test_attributevalidationtool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_attributevalidationtool.py b/tests/test_attributevalidationtool.py index 7511b3aa6..bf2afee69 100644 --- a/tests/test_attributevalidationtool.py +++ b/tests/test_attributevalidationtool.py @@ -375,7 +375,7 @@ def test_validate_object(self): # Test with MISPObject misp_object = MISPObject('file') misp_object.from_dict(**object_dict) - validated_object = validate_object(misp_object, errors := {}) # type: ignore + validated_object = validate_object(misp_object, errors := {}) self.assertEqual(len(validated_object.attributes), 1) self.assertEqual(validated_object.attributes[0].value, 'test.txt') self.assertIn('Checksum has an invalid length or format', errors['warnings'][0]) From 8c178468a08a534fec8874fc7b5210ac652fee39 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Mon, 8 Dec 2025 19:34:46 +0100 Subject: [PATCH 24/26] fix: [tools] Making sure the `edited` flag is not modified during the validation - As validation should not be a change per se, we do not want the `edited` flag to be set when we set the value field in attributes, or the `Attribute` field in MISP objects - This avoids `timestamp` fields to be skipped when using `.to_json()` --- pymisp/tools/attributevalidationtool.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 79bd59037..d6c880b98 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -693,6 +693,7 @@ def validate_attribute(attribute: dict | MISPAttribute) -> MISPAttribute: # typ message = f'Error loading Attribute: {e}' logger.error(message) raise PyMISPError(message) + is_edited = attribute.edited try: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) @@ -704,7 +705,9 @@ def validate_attribute(attribute: dict | MISPAttribute) -> MISPAttribute: # typ message = _message_logging(validated, attribute) logger.warning(message) raise ValidationError(message) - attribute.value = value + if attribute.value != value: + attribute.value = value + attribute.edited = is_edited return attribute @@ -767,8 +770,10 @@ def validate_object(misp_object: dict | MISPObject, errors: dict) -> MISPObject: message = f'Error loading Object: {e}' logger.error(message) raise PyMISPError(message) + is_edited = misp_object.edited # Validation of Object Attributes misp_object.attributes = list(_validate_object_attributes(misp_object, errors)) + misp_object.edited = is_edited return misp_object @@ -824,6 +829,7 @@ def _populate_error_message(errors: dict[str, list[str]], key: str, message: str def _validate_object_attributes(misp_object: MISPObject, errors: dict) -> Generator: # type: ignore for attribute in misp_object.attributes: + is_edited = attribute.edited try: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) @@ -837,5 +843,7 @@ def _validate_object_attributes(misp_object: MISPObject, errors: dict) -> Genera logger.warning(message) _populate_error_message(errors, 'warnings', message) continue - attribute.value = value + if attribute.value != value: + attribute.value = value + attribute.edited = is_edited yield attribute From 88218f51bd6e621b5aef142bc5d8985560039578 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Tue, 9 Dec 2025 17:52:26 +0100 Subject: [PATCH 25/26] fix: [tools] Delegating the potential loading error handling to PyMISP - As PyMISP already handles properly the different possible types of input given to `from_dict`, it makes more sense here to test whether the type we're expecting is NOT one of the PyMISP classes --- pymisp/tools/attributevalidationtool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index d6c880b98..4401cbc92 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -686,7 +686,7 @@ def validate_attribute(attribute: dict | MISPAttribute) -> MISPAttribute: # typ :raises PyMISPError: If the attribute cannot be loaded or a validation error occurs :raises ValidationError: If the attribute is invalid """ - if isinstance(attribute, dict): + if not isinstance(attribute, MISPAttribute): try: attribute = _load_misp_attribute(attribute) except Exception as e: @@ -740,7 +740,7 @@ def validate_event(event: dict | MISPEvent, errors: dict) -> MISPEvent: # type: :return: MISPEvent with only valid attributes :raises PyMISPError: If the event cannot be loaded """ - if isinstance(event, dict): + if not isinstance(event, MISPEvent): try: event = _load_misp_event(event) except Exception as e: @@ -763,7 +763,7 @@ def validate_object(misp_object: dict | MISPObject, errors: dict) -> MISPObject: :return: MISPObject with only valid attributes :raises PyMISPError: If the object cannot be loaded """ - if isinstance(misp_object, dict): + if not isinstance(misp_object, MISPObject): try: misp_object = _load_misp_object(misp_object) except Exception as e: From dda5307ee17f7829cebcae728ee43498d4fa1106 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Mon, 5 Jan 2026 15:31:06 +0100 Subject: [PATCH 26/26] fix: [tools] Fixed regex for `pehash` attribute type validation - Was still containing a delimiter as in PHP --- pymisp/tools/attributevalidationtool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymisp/tools/attributevalidationtool.py b/pymisp/tools/attributevalidationtool.py index 4401cbc92..6c00ac606 100644 --- a/pymisp/tools/attributevalidationtool.py +++ b/pymisp/tools/attributevalidationtool.py @@ -321,7 +321,7 @@ def validate(cls, attribute_type, value): return True return 'Unknown HTTP method.' case 'filename|pehash': - if re.fullmatch(r'^.+\|[0-9a-f]{40}$#', value): + if re.fullmatch(r'^.+\|[0-9a-f]{40}$', value): return True return ( "The input doesn't match the expected filename|sha1 format "