Skip to content

Commit 2835883

Browse files
authored
Separate TUF and trusted root management code (#844)
* Separate TUF and trusted root management code The purpose of this is to later enable both "--trust-root <FILE>" and some sort of offline functionality. * Trusted root can now be initialized from tuf, offline tuf or from a file * _internal.tuf module is now used only from the new trustroot module * Tests are modified to use the TrustRoot API now but they still (also) test the internal TUF implementation details * The new functionality (offline & from_file) is tested but is not exposed to UI * TrustUpdater now updates metadata when it is created (if not offline): This does not change application functionality as a online TrustUpdater is only created if a TUF update is needed Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
1 parent 7b8c910 commit 2835883

File tree

9 files changed

+289
-221
lines changed

9 files changed

+289
-221
lines changed

sigstore/_cli.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
RekorClient,
4141
RekorKeyring,
4242
)
43-
from sigstore._internal.tuf import TrustUpdater
43+
from sigstore._internal.trustroot import TrustedRoot
4444
from sigstore._utils import PEMCert
4545
from sigstore.errors import Error
4646
from sigstore.oidc import (
@@ -650,16 +650,16 @@ def _sign(args: argparse.Namespace) -> None:
650650
elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL:
651651
signing_ctx = SigningContext.production()
652652
else:
653-
# Assume "production" keys if none are given as arguments
654-
updater = TrustUpdater.production()
653+
# Assume "production" trust root if no keys are given as arguments
654+
trusted_root = TrustedRoot.production()
655655
if args.ctfe_pem is not None:
656656
ctfe_keys = [args.ctfe_pem.read()]
657657
else:
658-
ctfe_keys = updater.get_ctfe_keys()
658+
ctfe_keys = trusted_root.get_ctfe_keys()
659659
if args.rekor_root_pubkey is not None:
660660
rekor_keys = [args.rekor_root_pubkey.read()]
661661
else:
662-
rekor_keys = updater.get_rekor_keys()
662+
rekor_keys = trusted_root.get_rekor_keys()
663663

664664
ct_keyring = CTKeyring(Keyring(ctfe_keys))
665665
rekor_keyring = RekorKeyring(Keyring(rekor_keys))
@@ -828,8 +828,8 @@ def _collect_verification_state(
828828
if args.rekor_root_pubkey is not None:
829829
rekor_keys = [args.rekor_root_pubkey.read()]
830830
else:
831-
updater = TrustUpdater.production()
832-
rekor_keys = updater.get_rekor_keys()
831+
trusted_root = TrustedRoot.production()
832+
rekor_keys = trusted_root.get_rekor_keys()
833833

834834
verifier = Verifier(
835835
rekor=RekorClient(

sigstore/_internal/rekor/client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from sigstore._internal.ctfe import CTKeyring
3131
from sigstore._internal.keyring import Keyring
32-
from sigstore._internal.tuf import TrustUpdater
32+
from sigstore._internal.trustroot import TrustedRoot
3333
from sigstore.transparency import LogEntry
3434

3535
logger = logging.getLogger(__name__)
@@ -232,14 +232,14 @@ def __del__(self) -> None:
232232
self.session.close()
233233

234234
@classmethod
235-
def production(cls, updater: TrustUpdater) -> RekorClient:
235+
def production(cls, trust_root: TrustedRoot) -> RekorClient:
236236
"""
237237
Returns a `RekorClient` populated with the default Rekor production instance.
238238
239-
updater must be a `TrustUpdater` for the production TUF repository.
239+
trust_root must be a `TrustedRoot` for the production TUF repository.
240240
"""
241-
rekor_keys = updater.get_rekor_keys()
242-
ctfe_keys = updater.get_ctfe_keys()
241+
rekor_keys = trust_root.get_rekor_keys()
242+
ctfe_keys = trust_root.get_ctfe_keys()
243243

244244
return cls(
245245
DEFAULT_REKOR_URL,
@@ -248,14 +248,14 @@ def production(cls, updater: TrustUpdater) -> RekorClient:
248248
)
249249

250250
@classmethod
251-
def staging(cls, updater: TrustUpdater) -> RekorClient:
251+
def staging(cls, trust_root: TrustedRoot) -> RekorClient:
252252
"""
253253
Returns a `RekorClient` populated with the default Rekor staging instance.
254254
255-
updater must be a `TrustUpdater` for the staging TUF repository.
255+
trust_root must be a `TrustedRoot` for the staging TUF repository.
256256
"""
257-
rekor_keys = updater.get_rekor_keys()
258-
ctfe_keys = updater.get_ctfe_keys()
257+
rekor_keys = trust_root.get_rekor_keys()
258+
ctfe_keys = trust_root.get_ctfe_keys()
259259

260260
return cls(
261261
STAGING_REKOR_URL,

sigstore/_internal/trustroot.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2023 The Sigstore Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Trust root management for sigstore-python.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
from datetime import datetime, timezone
22+
from pathlib import Path
23+
from typing import Iterable
24+
25+
from cryptography.x509 import Certificate, load_der_x509_certificate
26+
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
27+
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
28+
CertificateAuthority,
29+
TransparencyLogInstance,
30+
)
31+
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
32+
TrustedRoot as _TrustedRoot,
33+
)
34+
35+
from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
36+
from sigstore.errors import MetadataError
37+
38+
39+
def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
40+
"""
41+
Given a `period`, checks that the the current time is not before `start`. If
42+
`allow_expired` is `False`, also checks that the current time is not after
43+
`end`.
44+
"""
45+
now = datetime.now(timezone.utc)
46+
47+
# If there was no validity period specified, the key is always valid.
48+
if not period:
49+
return True
50+
51+
# Active: if the current time is before the starting period, we are not yet
52+
# valid.
53+
if now < period.start:
54+
return False
55+
56+
# If we want Expired keys, the key is valid at this point. Otherwise, check
57+
# that we are within range.
58+
return allow_expired or (period.end is None or now <= period.end)
59+
60+
61+
class TrustedRoot(_TrustedRoot):
62+
"""Complete set of trusted entities for a Sigstore client"""
63+
64+
@classmethod
65+
def from_file(cls, path: str) -> "TrustedRoot":
66+
"""Create a new trust root from file"""
67+
tr: TrustedRoot = cls().from_json(Path(path).read_bytes())
68+
return tr
69+
70+
@classmethod
71+
def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot":
72+
"""Create a new trust root from a TUF repository.
73+
74+
If `offline`, will use trust root in local TUF cache. Otherwise will
75+
update the trust root from remote TUF repository.
76+
"""
77+
path = TrustUpdater(url, offline).get_trusted_root_path()
78+
return cls.from_file(path)
79+
80+
@classmethod
81+
def production(cls, offline: bool = False) -> "TrustedRoot":
82+
"""Create new trust root from Sigstore production TUF repository.
83+
84+
If `offline`, will use trust root in local TUF cache. Otherwise will
85+
update the trust root from remote TUF repository.
86+
"""
87+
return cls.from_tuf(DEFAULT_TUF_URL, offline)
88+
89+
@classmethod
90+
def staging(cls, offline: bool = False) -> "TrustedRoot":
91+
"""Create new trust root from Sigstore staging TUF repository.
92+
93+
If `offline`, will use trust root in local TUF cache. Otherwise will
94+
update the trust root from remote TUF repository.
95+
"""
96+
return cls.from_tuf(STAGING_TUF_URL, offline)
97+
98+
@staticmethod
99+
def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]:
100+
"""Return public key contents given transparency log instances."""
101+
102+
for key in tlogs:
103+
if not _is_timerange_valid(key.public_key.valid_for, allow_expired=False):
104+
continue
105+
key_bytes = key.public_key.raw_bytes
106+
if key_bytes:
107+
yield key_bytes
108+
109+
@staticmethod
110+
def _get_ca_keys(
111+
cas: list[CertificateAuthority], *, allow_expired: bool
112+
) -> Iterable[bytes]:
113+
"""Return public key contents given certificate authorities."""
114+
115+
for ca in cas:
116+
if not _is_timerange_valid(ca.valid_for, allow_expired=allow_expired):
117+
continue
118+
for cert in ca.cert_chain.certificates:
119+
yield cert.raw_bytes
120+
121+
def get_ctfe_keys(self) -> list[bytes]:
122+
"""Return the active CTFE public keys contents."""
123+
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs))
124+
if not ctfes:
125+
raise MetadataError("Active CTFE keys not found in trusted root")
126+
return ctfes
127+
128+
def get_rekor_keys(self) -> list[bytes]:
129+
"""Return the rekor public key content."""
130+
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs))
131+
132+
if len(keys) != 1:
133+
raise MetadataError("Did not find one active Rekor key in trusted root")
134+
return keys
135+
136+
def get_fulcio_certs(self) -> list[Certificate]:
137+
"""Return the Fulcio certificates."""
138+
139+
certs: list[Certificate]
140+
141+
# Return expired certificates too: they are expired now but may have
142+
# been active when the certificate was used to sign.
143+
certs = [
144+
load_der_x509_certificate(c)
145+
for c in self._get_ca_keys(self.certificate_authorities, allow_expired=True)
146+
]
147+
148+
if not certs:
149+
raise MetadataError("Fulcio certificates not found in trusted root")
150+
return certs

0 commit comments

Comments
 (0)