Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up certificate/CRL properties and decoder exceptions #89

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project from version 0.9.3 onwards are documented in this file.

## 0.11.2 - 2024-07-16

### Fixes

- Gracefully handle mis-encoded extensions and fields exposed as properties (#88)

## 0.11.1 - 2024-07-02

### New features/enhancements
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2020-2023 DigiCert, Inc.
Copyright (c) 2020-2024 DigiCert, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.11.1
0.11.2
181 changes: 107 additions & 74 deletions pkilint/document.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import binascii
import datetime
import logging
import re
Expand All @@ -12,12 +11,10 @@
Choice, BitString
)


logger = logging.getLogger(__name__)

PATH_REGEX = re.compile(r'^((?P<doc_name>[^:]*):)?(?P<node_path>([^.]+\.)*[^.]+)?$')


try:
# noinspection PyUnresolvedReferences
from pyasn1_fasder import decode_der
Expand Down Expand Up @@ -50,7 +47,7 @@ def __str__(self) -> str:
)


class Document(object):
class Document:
"""Represents an ASN.1-encoded document."""

def __init__(
Expand Down Expand Up @@ -92,7 +89,7 @@ def __repr__(self):
return f'{self.root.name} document "{self.substrate_source}"'


class PDUNode(object):
class PDUNode:
"""Represents a node of a document."""

def __init__(self, document: Document, name: str, pdu: Asn1Type,
Expand Down Expand Up @@ -234,7 +231,7 @@ def __repr__(self):
return f'{self.pdu.__class__.__name__} @ {path}'


class NodeVisitor(object):
class NodeVisitor:
def __init__(self, *,
path: str = None,
path_re: re.Pattern = None,
Expand Down Expand Up @@ -263,21 +260,98 @@ def match(self, node: PDUNode) -> bool:
return True


class ValueDecodingFailedError(Exception):
def __init__(self, value_node: PDUNode, type_oid: ObjectIdentifier,
pdu_type: Optional[Asn1Type], message: str
):
self.value_node = value_node
self.type_oid = type_oid
self.pdu_type = pdu_type
def get_node_name_for_pdu(pdu: Asn1Type) -> str:
name = pdu.__class__.__name__
# convert PDU class name to camelCase
return name[0].lower() + name[1:]


class SubstrateDecodingFailedError(ValueError):
def __init__(
self, source_document: Document, pdu_instance: Optional[Asn1Type], parent_node: Optional[PDUNode],
message: Optional[str]
):
self.source_document = source_document
self.pdu_instance = pdu_instance
self.parent_node = parent_node
self.message = message

def __str__(self):
message = f'Error occurred while decoding substrate in document "{self.source_document.name}"'

if self.parent_node:
message += f' @ {self.parent_node.path}'

if self.pdu_instance:
message += f' using schema "{self.pdu_instance.__class__.__name__}"'

if self.message:
message += f': {self.message}'

return message


def decode_substrate(source_document: Document, substrate: bytes,
pdu_instance: Asn1Type, parent_node: Optional[PDUNode] = None) -> PDUNode:
if parent_node is not None and any(parent_node.children):
logger.debug("%s has child node; not creating new PDU node",
parent_node.path
)
return next(iter(parent_node.children.values()))

if _USE_PYASN1_FASDER:
try:
decoded, _ = decode_der(substrate, asn1Spec=pdu_instance)
except (ValueError, PyAsn1Error) as e:
raise SubstrateDecodingFailedError(source_document, pdu_instance, parent_node, str(e)) from e

decoded_pdu_name = get_node_name_for_pdu(decoded)
else:
try:
decoded, rest = decode(substrate, asn1Spec=pdu_instance)
except (ValueError, PyAsn1Error) as e:
raise SubstrateDecodingFailedError(source_document, pdu_instance, parent_node, str(e)) from e

decoded_pdu_name = get_node_name_for_pdu(decoded)
type_name = decoded.__class__.__name__

if len(rest) > 0:
rest_hex = bytes(rest).hex()

raise SubstrateDecodingFailedError(
source_document, pdu_instance, parent_node,
f'{len(rest)} unexpected octet(s) following "{type_name}" TLV: "{rest_hex}"'
)

try:
encoded = encode(decoded)

substrate_is_der = encoded == substrate
except (ValueError, PyAsn1Error):
substrate_is_der = False

if not substrate_is_der:
raise SubstrateDecodingFailedError(
source_document, pdu_instance, parent_node,
f'Substrate of type "{type_name}" is not DER-encoded'
)

node = PDUNode(source_document, decoded_pdu_name, decoded, parent_node)

if parent_node is not None:
parent_node.children[decoded_pdu_name] = node
logger.debug("Appended %s node to %s", node.name,
parent_node.path
)

return node


class OptionalAsn1TypeWrapper(NamedTuple):
asn1_type: Asn1Type


class ValueDecoder(object):
class ValueDecoder:
_BITSTRING_SCHEMA_OBJ = BitString()

VALUE_NODE_ABSENT = object()
Expand Down Expand Up @@ -319,16 +393,19 @@ def __call__(self, node):

# value node must be absent, but it exists
elif pdu_type is self.VALUE_NODE_ABSENT and value_node is not None:
raise ValueDecodingFailedError(
value_node, type_node.pdu, pdu_type,
'Value node is present, but the ASN.1 schema specifies that it must be absent'
raise SubstrateDecodingFailedError(
node.document, None, value_node,
f'Value node is present, but type OID {type_node.pdu} specifies that it must be absent'
)

# value node must be present, but it doesn't exist
elif pdu_type is not self.VALUE_NODE_ABSENT and value_node is None:
raise ValueDecodingFailedError(
node, type_node.pdu, pdu_type,
'Value node is absent, but the ASN.1 schema specifies that it must be present'
schema_name = pdu_type.__class__.__name__

raise SubstrateDecodingFailedError(
node.document, pdu_type, value_node,
f'Value node is absent, but type OID {type_node.pdu} specifies that a '
f'"{schema_name}" value must be present'
)

if pdu_type is self.VALUE_NODE_ABSENT or pdu_type is None:
Expand All @@ -337,69 +414,25 @@ def __call__(self, node):
value_octets = self.filter_value(node, type_node, value_node, pdu_type)

try:
decode_substrate(value_node.document, value_octets,
pdu_type, value_node
)
except (PyAsn1Error, ValueError) as e:
raise ValueDecodingFailedError(
value_node, type_node.pdu, pdu_type, str(e)
)
decode_substrate(value_node.document, value_octets, pdu_type, value_node)
except SubstrateDecodingFailedError as e:
schema_name = pdu_type.__class__.__name__

message = (
f'ASN.1 decoding failure occurred at "{value_node.path}" with schema "{schema_name}" corresponding to '
f'type OID {type_node.pdu}: {e.message}'
)

def get_node_name_for_pdu(pdu: Asn1Type) -> str:
name = pdu.__class__.__name__
# convert PDU class name to camelCase
return name[0].lower() + name[1:]
raise SubstrateDecodingFailedError(
e.source_document, e.pdu_instance, e.parent_node, message
) from e


def get_document_by_name(node: PDUNode, document_name: str) -> Document:
"""Retrieves the document with the specified name"""
return node.document.parent[document_name]


def decode_substrate(source_document: Document, substrate: bytes,
pdu_instance: Asn1Type, parent_node: Optional[PDUNode] = None) -> PDUNode:
if parent_node is not None and any(parent_node.children):
logger.debug("%s has child node; not creating new PDU node",
parent_node.path
)
return next(iter(parent_node.children.values()))

if _USE_PYASN1_FASDER:
decoded, _ = decode_der(substrate, asn1Spec=pdu_instance)

decoded_pdu_name = get_node_name_for_pdu(decoded)
else:
decoded, rest = decode(substrate, asn1Spec=pdu_instance)

decoded_pdu_name = get_node_name_for_pdu(decoded)

if len(rest) > 0:
raise ValueError(
"Unexpected {} octets following {} DER in {}: {}".format(
len(rest), decoded_pdu_name, source_document.substrate_source,
binascii.hexlify(rest).decode('us-ascii')
)
)

encoded = encode(decoded)
if encoded != substrate:
type_name = decoded.__class__.__name__
raise ValueError(
f'Substrate of type "{type_name}" is not DER-encoded'
)

node = PDUNode(source_document, decoded_pdu_name, decoded, parent_node)

if parent_node is not None:
parent_node.children[decoded_pdu_name] = node
logger.debug("Appended %s node to %s", node.name,
parent_node.path
)

return node


def get_re_for_path_glob(path_glob: str) -> re.Pattern:
return re.compile(
path_glob.replace('.', r'\.').replace('?', r'\w').replace('*', r'\w*')
Expand Down
3 changes: 3 additions & 0 deletions pkilint/pkix/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import enum
from typing import Optional

Expand All @@ -8,6 +9,8 @@
from pkilint.document import ValueDecoder
from pkilint.pkix import extension, algorithm, name

MAXIMUM_TIME_DATETIME = datetime.datetime(9999, 12, 31, 23, 59, 59, tzinfo=datetime.timezone.utc)


def create_attribute_decoder(type_mappings, decode_unknown_as_directorystring=True):
default = rfc5280.DirectoryString() if decode_unknown_as_directorystring else None
Expand Down
24 changes: 17 additions & 7 deletions pkilint/pkix/certificate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ def __init__(self, substrate_source, substrate,

@property
def not_before(self):
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notBefore')
)
try:
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notBefore')
)
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

@property
def not_after(self):
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notAfter')
)
try:
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notAfter')
)
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

def _decode_and_append_extension(
self, ext_oid: univ.ObjectIdentifier, ext_asn1_spec: Asn1Type) -> Optional[document.PDUNode]:
Expand All @@ -56,7 +62,11 @@ def _decode_and_append_extension(
ext, _ = ext_and_idx
ext_value = ext.children['extnValue']

return document.decode_substrate(self, ext_value.pdu.asOctets(), ext_asn1_spec, ext_value)
try:
return document.decode_substrate(self, ext_value.pdu.asOctets(), ext_asn1_spec, ext_value)
except ValueError:
# suppress decoding errors, which will be reported by DecodingValidator instances
return None

@functools.cached_property
def is_ca(self) -> bool:
Expand Down
13 changes: 9 additions & 4 deletions pkilint/pkix/crl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ def __init__(self, substrate_source, substrate,

@property
def this_update(self):
return time.parse_time_node(
self.root.navigate('tbsCertificateList.thisUpdate')
)
try:
return time.parse_time_node(
self.root.navigate('tbsCertList.thisUpdate')
)
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

@property
def next_update(self):
try:
return time.parse_time_node(
self.root.navigate('tbsCertificateList.nextUpdate')
self.root.navigate('tbsCertList.nextUpdate')
)
except document.PDUNavigationFailedError:
return None
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

def get_extension_by_oid(self, oid):
tbs_crl = self.root.children['tbsCertList']
Expand Down
Loading