Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ All versions prior to 0.9.0 are untracked.

## [Unreleased]

### Changed

* **BREAKING API CHANGE**: `sigstore.sign.SigningResult` has been removed
([#862](https://github.com/sigstore/sigstore-python/pull/862))
* **BREAKING API CHANGE**: The `Signer.sign(...)` API now returns a `Bundle`,
instead of a `SigningResult` ([#862](https://github.com/sigstore/sigstore-python/pull/862))

## [2.1.0]

### Added
Expand Down
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ lint = [
# and let Dependabot periodically perform this update.
"ruff < 0.1.14",
"types-requests",
# TODO(ww): Re-enable once dependency on types-cryptography is dropped.
# See: https://github.com/python/typeshed/issues/8699
# "types-pyOpenSSL",
"types-pyOpenSSL",
]
doc = ["pdoc"]
dev = ["build", "bump >= 1.3.2", "sigstore[doc,test,lint]"]
Expand Down
15 changes: 9 additions & 6 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
RekorKeyring,
)
from sigstore._internal.trustroot import TrustedRoot
from sigstore._utils import PEMCert
from sigstore._utils import PEMCert, cert_der_to_pem
from sigstore.errors import Error
from sigstore.oidc import (
DEFAULT_OAUTH_ISSUER_URL,
Expand Down Expand Up @@ -698,10 +698,12 @@ def _sign(args: argparse.Namespace) -> None:
raise exp_certificate

print("Using ephemeral certificate:")
print(result.cert_pem)
cert = result.verification_material.x509_certificate_chain.certificates[0]
cert_pem = cert_der_to_pem(cert.raw_bytes)
print(cert_pem)

print(
f"Transparency log entry created at index: {result.log_entry.log_index}"
f"Transparency log entry created at index: {result.verification_material.tlog_entries[0].log_index}"
)

sig_output: TextIO
Expand All @@ -710,18 +712,19 @@ def _sign(args: argparse.Namespace) -> None:
else:
sig_output = sys.stdout

print(result.b64_signature, file=sig_output)
signature = base64.b64encode(result.message_signature.signature).decode()
print(signature, file=sig_output)
if outputs["sig"] is not None:
print(f"Signature written to {outputs['sig']}")

if outputs["cert"] is not None:
with outputs["cert"].open(mode="w") as io:
print(result.cert_pem, file=io)
print(cert_pem, file=io)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information

This expression logs [sensitive data (certificate)](1) as clear text. This expression logs [sensitive data (certificate)](1) as clear text.
print(f"Certificate written to {outputs['cert']}")

if outputs["bundle"] is not None:
with outputs["bundle"].open(mode="w") as io:
print(result.to_bundle().to_json(), file=io)
print(result.to_json(), file=io)
print(f"Sigstore bundle written to {outputs['bundle']}")


Expand Down
20 changes: 19 additions & 1 deletion sigstore/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.x509 import Certificate, ExtensionNotFound, Version
from cryptography.x509 import (
Certificate,
ExtensionNotFound,
Version,
load_der_x509_certificate,
)
from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID

from sigstore.errors import Error
Expand Down Expand Up @@ -126,6 +131,19 @@ def base64_encode_pem_cert(cert: Certificate) -> B64Str:
)


def cert_der_to_pem(der: bytes) -> str:
"""
Converts a DER-encoded X.509 certificate into its PEM encoding.

Returns a string containing a PEM-encoded X.509 certificate.
"""

# NOTE: Technically we don't have to round-trip like this, since
# the DER-to-PEM transformation is entirely mechanical.
cert = load_der_x509_certificate(der)
return cert.public_bytes(serialization.Encoding.PEM).decode()


def key_id(key: PublicKey) -> KeyID:
"""
Returns an RFC 6962-style "key ID" for the given public key.
Expand Down
133 changes: 51 additions & 82 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.x509.oid import NameOID
from pydantic import BaseModel
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
Bundle,
VerificationMaterial,
Expand Down Expand Up @@ -175,7 +174,7 @@ def _signing_cert(
def sign(
self,
input_: IO[bytes],
) -> SigningResult:
) -> Bundle:
"""Public API for signing blobs"""
input_digest = sha256_streaming(input_)
private_key = self._private_key
Expand Down Expand Up @@ -233,7 +232,7 @@ def sign(

logger.debug(f"Transparency log entry created with index: {entry.log_index}")

return SigningResult(
return _make_bundle(
input_digest=HexStr(input_digest.hex()),
cert_pem=PEMCert(
cert.public_bytes(encoding=serialization.Encoding.PEM).decode()
Expand Down Expand Up @@ -308,90 +307,60 @@ def signer(
yield Signer(identity_token, self, cache)


class SigningResult(BaseModel):
def _make_bundle(
input_digest: HexStr, cert_pem: PEMCert, b64_signature: B64Str, log_entry: LogEntry
) -> Bundle:
"""
Represents the artifacts of a signing operation.
Convert the raw results of a Sigstore signing operation into a Sigstore bundle.
"""

input_digest: HexStr
"""
The hex-encoded SHA256 digest of the input that was signed for.
"""

cert_pem: PEMCert
"""
The PEM-encoded public half of the certificate used for signing.
"""

b64_signature: B64Str
"""
The base64-encoded signature.
"""

log_entry: LogEntry
"""
A record of the Rekor log entry for the signing operation.
"""

def to_bundle(self) -> Bundle:
"""
Creates a Sigstore bundle (as defined by Sigstore's protobuf specs)
from this `SigningResult`.
"""

# NOTE: We explicitly only include the leaf certificate in the bundle's "chain"
# here: the specs explicitly forbid the inclusion of the root certificate,
# and discourage inclusion of any intermediates (since they're in the root of
# trust already).
cert = x509.load_pem_x509_certificate(self.cert_pem.encode())
cert_der = cert.public_bytes(encoding=serialization.Encoding.DER)
chain = X509CertificateChain(certificates=[X509Certificate(raw_bytes=cert_der)])

inclusion_proof: InclusionProof | None = None
if self.log_entry.inclusion_proof is not None:
inclusion_proof = InclusionProof(
log_index=self.log_entry.inclusion_proof.log_index,
root_hash=bytes.fromhex(self.log_entry.inclusion_proof.root_hash),
tree_size=self.log_entry.inclusion_proof.tree_size,
hashes=[
bytes.fromhex(h) for h in self.log_entry.inclusion_proof.hashes
],
checkpoint=Checkpoint(
envelope=self.log_entry.inclusion_proof.checkpoint
),
)

tlog_entry = TransparencyLogEntry(
log_index=self.log_entry.log_index,
log_id=LogId(key_id=bytes.fromhex(self.log_entry.log_id)),
kind_version=KindVersion(kind="hashedrekord", version="0.0.1"),
integrated_time=self.log_entry.integrated_time,
inclusion_promise=InclusionPromise(
signed_entry_timestamp=base64.b64decode(
self.log_entry.inclusion_promise
)
)
if self.log_entry.inclusion_promise
else None,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(self.log_entry.body),
# NOTE: We explicitly only include the leaf certificate in the bundle's "chain"
# here: the specs explicitly forbid the inclusion of the root certificate,
# and discourage inclusion of any intermediates (since they're in the root of
# trust already).
cert = x509.load_pem_x509_certificate(cert_pem.encode())
cert_der = cert.public_bytes(encoding=serialization.Encoding.DER)
chain = X509CertificateChain(certificates=[X509Certificate(raw_bytes=cert_der)])

inclusion_proof: InclusionProof | None = None
if log_entry.inclusion_proof is not None:
inclusion_proof = InclusionProof(
log_index=log_entry.inclusion_proof.log_index,
root_hash=bytes.fromhex(log_entry.inclusion_proof.root_hash),
tree_size=log_entry.inclusion_proof.tree_size,
hashes=[bytes.fromhex(h) for h in log_entry.inclusion_proof.hashes],
checkpoint=Checkpoint(envelope=log_entry.inclusion_proof.checkpoint),
)

material = VerificationMaterial(
x509_certificate_chain=chain,
tlog_entries=[tlog_entry],
tlog_entry = TransparencyLogEntry(
log_index=log_entry.log_index,
log_id=LogId(key_id=bytes.fromhex(log_entry.log_id)),
kind_version=KindVersion(kind="hashedrekord", version="0.0.1"),
integrated_time=log_entry.integrated_time,
inclusion_promise=InclusionPromise(
signed_entry_timestamp=base64.b64decode(log_entry.inclusion_promise)
)

bundle = Bundle(
media_type="application/vnd.dev.sigstore.bundle+json;version=0.2",
verification_material=material,
message_signature=MessageSignature(
message_digest=HashOutput(
algorithm=HashAlgorithm.SHA2_256,
digest=bytes.fromhex(self.input_digest),
),
signature=base64.b64decode(self.b64_signature),
if log_entry.inclusion_promise
else None,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(log_entry.body),
)

material = VerificationMaterial(
x509_certificate_chain=chain,
tlog_entries=[tlog_entry],
)

bundle = Bundle(
media_type="application/vnd.dev.sigstore.bundle+json;version=0.2",
verification_material=material,
message_signature=MessageSignature(
message_digest=HashOutput(
algorithm=HashAlgorithm.SHA2_256,
digest=bytes.fromhex(input_digest),
),
)
signature=base64.b64decode(b64_signature),
),
)

return bundle
return bundle
2 changes: 1 addition & 1 deletion sigstore/verify/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage
from cryptography.x509.oid import ExtendedKeyUsageOID
from OpenSSL.crypto import ( # type: ignore[import-untyped]
from OpenSSL.crypto import (
X509,
X509Store,
X509StoreContext,
Expand Down
4 changes: 3 additions & 1 deletion test/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import base64
import os
import re
Expand Down Expand Up @@ -238,7 +240,7 @@ def tuf_dirs(monkeypatch, tmp_path):
],
ids=["production", "staging"],
)
def id_config(request):
def id_config(request) -> tuple[SigningContext, IdentityToken]:
env, signer = request.param
# Detect env variable for local interactive tests.
token = os.getenv(f"SIGSTORE_IDENTITY_TOKEN_{env}")
Expand Down
15 changes: 7 additions & 8 deletions test/unit/test_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import io
import secrets

Expand Down Expand Up @@ -45,14 +46,13 @@ def test_sign_rekor_entry_consistent(id_config):

payload = io.BytesIO(secrets.token_bytes(32))
with ctx.signer(identity) as signer:
expected_entry = signer.sign(payload).log_entry
expected_entry = signer.sign(payload).verification_material.tlog_entries[0]

actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index)

assert expected_entry.uuid == actual_entry.uuid
assert expected_entry.body == actual_entry.body
assert expected_entry.canonicalized_body == base64.b64decode(actual_entry.body)
assert expected_entry.integrated_time == actual_entry.integrated_time
assert expected_entry.log_id == actual_entry.log_id
assert expected_entry.log_id.key_id == bytes.fromhex(actual_entry.log_id)
assert expected_entry.log_index == actual_entry.log_index


Expand Down Expand Up @@ -109,11 +109,10 @@ def test_identity_proof_claim_lookup(id_config, monkeypatch):
payload = io.BytesIO(secrets.token_bytes(32))

with ctx.signer(identity) as signer:
expected_entry = signer.sign(payload).log_entry
expected_entry = signer.sign(payload).verification_material.tlog_entries[0]
actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index)

assert expected_entry.uuid == actual_entry.uuid
assert expected_entry.body == actual_entry.body
assert expected_entry.canonicalized_body == base64.b64decode(actual_entry.body)
assert expected_entry.integrated_time == actual_entry.integrated_time
assert expected_entry.log_id == actual_entry.log_id
assert expected_entry.log_id.key_id == bytes.fromhex(actual_entry.log_id)
assert expected_entry.log_index == actual_entry.log_index