Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 328 additions & 0 deletions boxsdk/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
from ..util.datetime_formatter import normalize_date_to_rfc3339_format
from ..util.shared_link import get_shared_link_header
from ..util.deprecation_decorator import deprecated
from ..auth.developer_token_auth import DeveloperTokenAuth
from ..auth.jwt_auth import JWTAuth
from ..auth.ccg_auth import CCGAuth
from ..auth.oauth2 import OAuth2 as LegacyOAuth2
from ..util.token_storage_adapter import LegacyTokenStorageAdapter

from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth
from box_sdk_gen.box.oauth import BoxOAuth, OAuthConfig
from box_sdk_gen.box.jwt_auth import BoxJWTAuth, JWTConfig
from box_sdk_gen.box.ccg_auth import BoxCCGAuth, CCGConfig
from box_sdk_gen.client import BoxClient
from box_sdk_gen.networking.network import NetworkSession
from box_sdk_gen.networking.base_urls import BaseUrls
from box_sdk_gen.networking.retries import BoxRetryStrategy
from box_sdk_gen.schemas.access_token import AccessToken

if TYPE_CHECKING:
from boxsdk import OAuth2
Expand Down Expand Up @@ -2009,3 +2024,316 @@ def get_ai_agent_default_config(
session=self._session,
response_object=box_response.json(),
)

def get_authentication(self, *, token_storage=None):
"""
Extract authentication configuration from this legacy client and convert it
to a generated SDK Authentication object.

This method supports the following legacy authentication types:
- DeveloperTokenAuth -> BoxDeveloperTokenAuth
- OAuth2 -> BoxOAuth
- JWTAuth -> BoxJWTAuth
- CCGAuth -> BoxCCGAuth

:param token_storage:
Optional TokenStorage instance for the generated SDK.
If not provided, an adapter will be created to bridge legacy token storage.
:return:
Authentication object compatible with the generated SDK (box_sdk_gen).
:raises ValueError:
If the authentication type is not supported or required credentials are missing.
"""
oauth = self._oauth

# Developer Token Authentication
if isinstance(oauth, DeveloperTokenAuth):
token = oauth.access_token
if not token:
raise ValueError("Developer token is not available")
return BoxDeveloperTokenAuth(token=token)

# OAuth 2.0 Authentication
# Check if it's OAuth2 (but not DeveloperTokenAuth, JWTAuth, or CCGAuth)
if isinstance(oauth, LegacyOAuth2) and not isinstance(
oauth, (DeveloperTokenAuth, JWTAuth, CCGAuth)
):
# It's OAuth2
client_id = getattr(oauth, '_client_id', None)
client_secret = getattr(oauth, '_client_secret', None)

if not client_id or not client_secret:
raise ValueError("OAuth2 client_id and client_secret are required")

# Create token storage adapter if not provided
if token_storage is None:
# Create adapter from legacy OAuth2's token storage
def get_tokens():
return oauth._get_tokens()

def store_tokens(access_token, refresh_token):
oauth._store_tokens(access_token, refresh_token)

token_storage = LegacyTokenStorageAdapter(
get_tokens=get_tokens, store_tokens=store_tokens
)

config = OAuthConfig(
client_id=client_id,
client_secret=client_secret,
token_storage=token_storage,
)

# Pre-populate with existing tokens if available
auth = BoxOAuth(config=config)
access_token, refresh_token = oauth._get_tokens()
if access_token:
existing_token = AccessToken(
access_token=access_token,
refresh_token=refresh_token,
expires_in=3600, # Default, actual expiry not available
token_type='bearer',
)
token_storage.store(existing_token)

return auth

# JWT Authentication
if isinstance(oauth, JWTAuth):
client_id = getattr(oauth, '_client_id', None)
client_secret = getattr(oauth, '_client_secret', None)
jwt_key_id = getattr(oauth, '_jwt_key_id', None)
rsa_private_key = getattr(oauth, '_rsa_private_key', None)
enterprise_id = getattr(oauth, '_enterprise_id', None)
user_id = getattr(oauth, '_user_id', None)

if not all([client_id, client_secret, jwt_key_id, rsa_private_key]):
raise ValueError(
"JWT authentication requires client_id, client_secret, jwt_key_id, and private key"
)

# Convert RSA private key to string format
# Note: If the key was originally encrypted, we can't extract the passphrase
# from the normalized RSAPrivateKey object. We'll serialize it unencrypted.
from cryptography.hazmat.primitives import serialization

try:
# Serialize the key to PEM format (unencrypted)
# This works even if the original key was encrypted, as the
# normalized RSAPrivateKey object is already decrypted
private_key_pem = rsa_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode('utf-8')
# Passphrase is not needed since we're serializing unencrypted
# The generated SDK will handle encryption if needed
passphrase = ''
except Exception as e:
raise ValueError(
f"Cannot serialize private key: {e}. "
"Please ensure the private key is valid."
) from e

# Create token storage adapter if not provided
if token_storage is None:
from box_sdk_gen.box.token_storage import InMemoryTokenStorage

token_storage = InMemoryTokenStorage()

config = JWTConfig(
client_id=client_id,
client_secret=client_secret,
jwt_key_id=jwt_key_id,
private_key=private_key_pem,
private_key_passphrase=passphrase,
enterprise_id=enterprise_id,
user_id=user_id,
token_storage=token_storage,
)

auth = BoxJWTAuth(config=config)

# Handle user vs enterprise scope
if user_id:
auth = auth.with_user_subject(user_id, token_storage=token_storage)

return auth

# CCG (Client Credentials Grant) Authentication
if isinstance(oauth, CCGAuth):
client_id = getattr(oauth, '_client_id', None)
client_secret = getattr(oauth, '_client_secret', None)
enterprise_id = getattr(oauth, '_enterprise_id', None)
user_id = getattr(oauth, '_user_id', None)

if not client_id or not client_secret:
raise ValueError(
"CCG authentication requires client_id and client_secret"
)

# Create token storage adapter if not provided
if token_storage is None:
from box_sdk_gen.box.token_storage import InMemoryTokenStorage

token_storage = InMemoryTokenStorage()

config = CCGConfig(
client_id=client_id,
client_secret=client_secret,
enterprise_id=enterprise_id,
user_id=user_id,
token_storage=token_storage,
)

auth = BoxCCGAuth(config=config)

# Handle user vs enterprise scope
if user_id:
auth = auth.with_user_subject(user_id, token_storage=token_storage)

return auth

raise ValueError(
f"Unsupported authentication type: {type(oauth).__name__}. "
"Supported types: DeveloperTokenAuth, OAuth2, JWTAuth, CCGAuth"
)

def get_network_session(
self,
*,
network_client=None,
retry_strategy=None,
data_sanitizer=None,
additional_headers=None,
):
"""
Extract network configuration from this legacy client and convert it
to a generated SDK NetworkSession object.

:param network_client:
Optional NetworkClient instance for the generated SDK.
If not provided, a default will be created with proxy support if configured.
:param retry_strategy:
Optional RetryStrategy instance for the generated SDK.
If not provided, one will be created from legacy retry settings.
:param data_sanitizer:
Optional DataSanitizer instance for the generated SDK.
:param additional_headers:
Optional dictionary of additional HTTP headers to merge with legacy headers.
:return:
NetworkSession object compatible with the generated SDK (box_sdk_gen).
"""
session = self._session
api_config = session.api_config
proxy_config = session.proxy_config

# Extract base URLs
base_url = getattr(api_config, 'BASE_API_URL', 'https://api.box.com/2.0')
# Remove version suffix if present
if base_url.endswith('/2.0'):
base_url = base_url[:-4]
elif base_url.endswith('/2'):
base_url = base_url[:-2]

upload_url = getattr(api_config, 'UPLOAD_URL', 'https://upload.box.com/api/2.0')
# Remove version suffix if present
if upload_url.endswith('/2.0'):
upload_url = upload_url[:-4]
elif upload_url.endswith('/2'):
upload_url = upload_url[:-2]

oauth2_url = getattr(
api_config, 'OAUTH2_AUTHORIZE_URL', 'https://account.box.com/api/oauth2'
)
# Extract base OAuth URL
if '/authorize' in oauth2_url:
oauth2_url = oauth2_url[: oauth2_url.rindex('/authorize')]

base_urls = BaseUrls(
base_url=base_url, upload_url=upload_url, oauth_2_url=oauth2_url
)

# Extract or create retry strategy
if retry_strategy is None:
max_retries = getattr(api_config, 'MAX_RETRY_ATTEMPTS', 5)
retry_base_interval = getattr(session, '_retry_base_interval', 1.0)
retry_strategy = BoxRetryStrategy(
max_attempts=max_retries, retry_base_interval=retry_base_interval
)

# Handle proxy configuration
proxy_url = None
if proxy_config and hasattr(proxy_config, 'URL') and proxy_config.URL:
proxy_url = proxy_config.URL
# Handle authenticated proxy
if hasattr(proxy_config, 'AUTH') and proxy_config.AUTH:
auth = proxy_config.AUTH
if isinstance(auth, dict) and 'user' in auth and 'password' in auth:
scheme = (
proxy_url.split('://', 1)[0] if '://' in proxy_url else 'http'
)
# Extract host from URL
host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url
proxy_url = f"{scheme}://{auth['user']}:{auth['password']}@{host}"

# Merge custom headers
headers = {}
if hasattr(session, '_default_headers'):
headers.update(session._default_headers.copy())
if additional_headers:
headers.update(additional_headers)

# Create network session
network_session = NetworkSession(
base_urls=base_urls,
network_client=network_client,
retry_strategy=retry_strategy,
additional_headers=headers if headers else None,
proxy_url=proxy_url,
data_sanitizer=data_sanitizer,
)

return network_session

def get_sdk_gen_client(self, *, auth_options=None, network_options=None):
"""
Create a fully configured generated SDK client from this legacy client.

This method combines get_authentication() and get_network_session() to create
a BoxClient instance that shares authentication and network configuration
with this legacy client.

:param auth_options:
Optional dictionary with authentication options:
- token_storage: Custom TokenStorage instance
:param network_options:
Optional dictionary with network options:
- network_client: Custom NetworkClient instance
- retry_strategy: Custom RetryStrategy instance
- data_sanitizer: Custom DataSanitizer instance
- additional_headers: Dictionary of additional HTTP headers
:return:
BoxClient instance from box_sdk_gen, fully configured with shared settings.
"""
# Extract authentication
token_storage = None
if auth_options and 'token_storage' in auth_options:
token_storage = auth_options['token_storage']

auth = self.get_authentication(token_storage=token_storage)

# Extract network session
network_kwargs = {}
if network_options:
network_kwargs = {
'network_client': network_options.get('network_client'),
'retry_strategy': network_options.get('retry_strategy'),
'data_sanitizer': network_options.get('data_sanitizer'),
'additional_headers': network_options.get('additional_headers'),
}

network_session = self.get_network_session(**network_kwargs)

# Create and return fully configured client
return BoxClient(auth=auth, network_session=network_session)
Loading