diff --git a/CHANGELOG.md b/CHANGELOG.md index e20cafa43..1a0706cca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,13 @@ All versions prior to 0.9.0 are untracked. ## [Unreleased] +### Changed + +* **BREAKING API CHANGE**: `sigstore.sign.SigningResult` has been removed + ([#862](https://github.com/sigstore/sigstore-python/pull/862)) +* **BREAKING API CHANGE**: The `Signer.sign(...)` API now returns a `Bundle`, + instead of a `SigningResult` ([#862](https://github.com/sigstore/sigstore-python/pull/862)) + ## [2.1.0] ### Added diff --git a/pyproject.toml b/pyproject.toml index f4232915e..c5225d816 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,9 +60,7 @@ lint = [ # and let Dependabot periodically perform this update. "ruff < 0.1.14", "types-requests", - # TODO(ww): Re-enable once dependency on types-cryptography is dropped. - # See: https://github.com/python/typeshed/issues/8699 - # "types-pyOpenSSL", + "types-pyOpenSSL", ] doc = ["pdoc"] dev = ["build", "bump >= 1.3.2", "sigstore[doc,test,lint]"] diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 0b08c09e6..da9714cba 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -41,7 +41,7 @@ RekorKeyring, ) from sigstore._internal.trustroot import TrustedRoot -from sigstore._utils import PEMCert +from sigstore._utils import PEMCert, cert_der_to_pem from sigstore.errors import Error from sigstore.oidc import ( DEFAULT_OAUTH_ISSUER_URL, @@ -698,10 +698,12 @@ def _sign(args: argparse.Namespace) -> None: raise exp_certificate print("Using ephemeral certificate:") - print(result.cert_pem) + cert = result.verification_material.x509_certificate_chain.certificates[0] + cert_pem = cert_der_to_pem(cert.raw_bytes) + print(cert_pem) print( - f"Transparency log entry created at index: {result.log_entry.log_index}" + f"Transparency log entry created at index: {result.verification_material.tlog_entries[0].log_index}" ) sig_output: TextIO @@ -710,18 +712,19 @@ def _sign(args: argparse.Namespace) -> None: else: sig_output = sys.stdout - print(result.b64_signature, file=sig_output) + signature = base64.b64encode(result.message_signature.signature).decode() + print(signature, file=sig_output) if outputs["sig"] is not None: print(f"Signature written to {outputs['sig']}") if outputs["cert"] is not None: with outputs["cert"].open(mode="w") as io: - print(result.cert_pem, file=io) + print(cert_pem, file=io) print(f"Certificate written to {outputs['cert']}") if outputs["bundle"] is not None: with outputs["bundle"].open(mode="w") as io: - print(result.to_bundle().to_json(), file=io) + print(result.to_json(), file=io) print(f"Sigstore bundle written to {outputs['bundle']}") diff --git a/sigstore/_utils.py b/sigstore/_utils.py index e6487c68f..0e4463566 100644 --- a/sigstore/_utils.py +++ b/sigstore/_utils.py @@ -25,7 +25,12 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec, rsa -from cryptography.x509 import Certificate, ExtensionNotFound, Version +from cryptography.x509 import ( + Certificate, + ExtensionNotFound, + Version, + load_der_x509_certificate, +) from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID from sigstore.errors import Error @@ -126,6 +131,19 @@ def base64_encode_pem_cert(cert: Certificate) -> B64Str: ) +def cert_der_to_pem(der: bytes) -> str: + """ + Converts a DER-encoded X.509 certificate into its PEM encoding. + + Returns a string containing a PEM-encoded X.509 certificate. + """ + + # NOTE: Technically we don't have to round-trip like this, since + # the DER-to-PEM transformation is entirely mechanical. + cert = load_der_x509_certificate(der) + return cert.public_bytes(serialization.Encoding.PEM).decode() + + def key_id(key: PublicKey) -> KeyID: """ Returns an RFC 6962-style "key ID" for the given public key. diff --git a/sigstore/sign.py b/sigstore/sign.py index b37ae98a6..e03d6fad9 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -51,7 +51,6 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric.utils import Prehashed from cryptography.x509.oid import NameOID -from pydantic import BaseModel from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( Bundle, VerificationMaterial, @@ -175,7 +174,7 @@ def _signing_cert( def sign( self, input_: IO[bytes], - ) -> SigningResult: + ) -> Bundle: """Public API for signing blobs""" input_digest = sha256_streaming(input_) private_key = self._private_key @@ -233,7 +232,7 @@ def sign( logger.debug(f"Transparency log entry created with index: {entry.log_index}") - return SigningResult( + return _make_bundle( input_digest=HexStr(input_digest.hex()), cert_pem=PEMCert( cert.public_bytes(encoding=serialization.Encoding.PEM).decode() @@ -308,90 +307,60 @@ def signer( yield Signer(identity_token, self, cache) -class SigningResult(BaseModel): +def _make_bundle( + input_digest: HexStr, cert_pem: PEMCert, b64_signature: B64Str, log_entry: LogEntry +) -> Bundle: """ - Represents the artifacts of a signing operation. + Convert the raw results of a Sigstore signing operation into a Sigstore bundle. """ - input_digest: HexStr - """ - The hex-encoded SHA256 digest of the input that was signed for. - """ - - cert_pem: PEMCert - """ - The PEM-encoded public half of the certificate used for signing. - """ - - b64_signature: B64Str - """ - The base64-encoded signature. - """ - - log_entry: LogEntry - """ - A record of the Rekor log entry for the signing operation. - """ - - def to_bundle(self) -> Bundle: - """ - Creates a Sigstore bundle (as defined by Sigstore's protobuf specs) - from this `SigningResult`. - """ - - # NOTE: We explicitly only include the leaf certificate in the bundle's "chain" - # here: the specs explicitly forbid the inclusion of the root certificate, - # and discourage inclusion of any intermediates (since they're in the root of - # trust already). - cert = x509.load_pem_x509_certificate(self.cert_pem.encode()) - cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) - chain = X509CertificateChain(certificates=[X509Certificate(raw_bytes=cert_der)]) - - inclusion_proof: InclusionProof | None = None - if self.log_entry.inclusion_proof is not None: - inclusion_proof = InclusionProof( - log_index=self.log_entry.inclusion_proof.log_index, - root_hash=bytes.fromhex(self.log_entry.inclusion_proof.root_hash), - tree_size=self.log_entry.inclusion_proof.tree_size, - hashes=[ - bytes.fromhex(h) for h in self.log_entry.inclusion_proof.hashes - ], - checkpoint=Checkpoint( - envelope=self.log_entry.inclusion_proof.checkpoint - ), - ) - - tlog_entry = TransparencyLogEntry( - log_index=self.log_entry.log_index, - log_id=LogId(key_id=bytes.fromhex(self.log_entry.log_id)), - kind_version=KindVersion(kind="hashedrekord", version="0.0.1"), - integrated_time=self.log_entry.integrated_time, - inclusion_promise=InclusionPromise( - signed_entry_timestamp=base64.b64decode( - self.log_entry.inclusion_promise - ) - ) - if self.log_entry.inclusion_promise - else None, - inclusion_proof=inclusion_proof, - canonicalized_body=base64.b64decode(self.log_entry.body), + # NOTE: We explicitly only include the leaf certificate in the bundle's "chain" + # here: the specs explicitly forbid the inclusion of the root certificate, + # and discourage inclusion of any intermediates (since they're in the root of + # trust already). + cert = x509.load_pem_x509_certificate(cert_pem.encode()) + cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) + chain = X509CertificateChain(certificates=[X509Certificate(raw_bytes=cert_der)]) + + inclusion_proof: InclusionProof | None = None + if log_entry.inclusion_proof is not None: + inclusion_proof = InclusionProof( + log_index=log_entry.inclusion_proof.log_index, + root_hash=bytes.fromhex(log_entry.inclusion_proof.root_hash), + tree_size=log_entry.inclusion_proof.tree_size, + hashes=[bytes.fromhex(h) for h in log_entry.inclusion_proof.hashes], + checkpoint=Checkpoint(envelope=log_entry.inclusion_proof.checkpoint), ) - material = VerificationMaterial( - x509_certificate_chain=chain, - tlog_entries=[tlog_entry], + tlog_entry = TransparencyLogEntry( + log_index=log_entry.log_index, + log_id=LogId(key_id=bytes.fromhex(log_entry.log_id)), + kind_version=KindVersion(kind="hashedrekord", version="0.0.1"), + integrated_time=log_entry.integrated_time, + inclusion_promise=InclusionPromise( + signed_entry_timestamp=base64.b64decode(log_entry.inclusion_promise) ) - - bundle = Bundle( - media_type="application/vnd.dev.sigstore.bundle+json;version=0.2", - verification_material=material, - message_signature=MessageSignature( - message_digest=HashOutput( - algorithm=HashAlgorithm.SHA2_256, - digest=bytes.fromhex(self.input_digest), - ), - signature=base64.b64decode(self.b64_signature), + if log_entry.inclusion_promise + else None, + inclusion_proof=inclusion_proof, + canonicalized_body=base64.b64decode(log_entry.body), + ) + + material = VerificationMaterial( + x509_certificate_chain=chain, + tlog_entries=[tlog_entry], + ) + + bundle = Bundle( + media_type="application/vnd.dev.sigstore.bundle+json;version=0.2", + verification_material=material, + message_signature=MessageSignature( + message_digest=HashOutput( + algorithm=HashAlgorithm.SHA2_256, + digest=bytes.fromhex(input_digest), ), - ) + signature=base64.b64decode(b64_signature), + ), + ) - return bundle + return bundle diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index 3477fab38..999e40b27 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -29,7 +29,7 @@ from cryptography.hazmat.primitives.asymmetric.utils import Prehashed from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage from cryptography.x509.oid import ExtendedKeyUsageOID -from OpenSSL.crypto import ( # type: ignore[import-untyped] +from OpenSSL.crypto import ( X509, X509Store, X509StoreContext, diff --git a/test/unit/conftest.py b/test/unit/conftest.py index fa71dd476..304167337 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import base64 import os import re @@ -238,7 +240,7 @@ def tuf_dirs(monkeypatch, tmp_path): ], ids=["production", "staging"], ) -def id_config(request): +def id_config(request) -> tuple[SigningContext, IdentityToken]: env, signer = request.param # Detect env variable for local interactive tests. token = os.getenv(f"SIGSTORE_IDENTITY_TOKEN_{env}") diff --git a/test/unit/test_sign.py b/test/unit/test_sign.py index a030a495e..ac6370ec7 100644 --- a/test/unit/test_sign.py +++ b/test/unit/test_sign.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import io import secrets @@ -45,14 +46,13 @@ def test_sign_rekor_entry_consistent(id_config): payload = io.BytesIO(secrets.token_bytes(32)) with ctx.signer(identity) as signer: - expected_entry = signer.sign(payload).log_entry + expected_entry = signer.sign(payload).verification_material.tlog_entries[0] actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index) - assert expected_entry.uuid == actual_entry.uuid - assert expected_entry.body == actual_entry.body + assert expected_entry.canonicalized_body == base64.b64decode(actual_entry.body) assert expected_entry.integrated_time == actual_entry.integrated_time - assert expected_entry.log_id == actual_entry.log_id + assert expected_entry.log_id.key_id == bytes.fromhex(actual_entry.log_id) assert expected_entry.log_index == actual_entry.log_index @@ -109,11 +109,10 @@ def test_identity_proof_claim_lookup(id_config, monkeypatch): payload = io.BytesIO(secrets.token_bytes(32)) with ctx.signer(identity) as signer: - expected_entry = signer.sign(payload).log_entry + expected_entry = signer.sign(payload).verification_material.tlog_entries[0] actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index) - assert expected_entry.uuid == actual_entry.uuid - assert expected_entry.body == actual_entry.body + assert expected_entry.canonicalized_body == base64.b64decode(actual_entry.body) assert expected_entry.integrated_time == actual_entry.integrated_time - assert expected_entry.log_id == actual_entry.log_id + assert expected_entry.log_id.key_id == bytes.fromhex(actual_entry.log_id) assert expected_entry.log_index == actual_entry.log_index