diff --git a/boxsdk/client/client.py b/boxsdk/client/client.py index 67b4d4ed6..c09d082d8 100644 --- a/boxsdk/client/client.py +++ b/boxsdk/client/client.py @@ -19,6 +19,21 @@ from ..util.datetime_formatter import normalize_date_to_rfc3339_format from ..util.shared_link import get_shared_link_header from ..util.deprecation_decorator import deprecated +from ..auth.developer_token_auth import DeveloperTokenAuth +from ..auth.jwt_auth import JWTAuth +from ..auth.ccg_auth import CCGAuth +from ..auth.oauth2 import OAuth2 as LegacyOAuth2 +from ..util.token_storage_adapter import LegacyTokenStorageAdapter + +from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.oauth import BoxOAuth, OAuthConfig +from box_sdk_gen.box.jwt_auth import BoxJWTAuth, JWTConfig +from box_sdk_gen.box.ccg_auth import BoxCCGAuth, CCGConfig +from box_sdk_gen.client import BoxClient +from box_sdk_gen.networking.network import NetworkSession +from box_sdk_gen.networking.base_urls import BaseUrls +from box_sdk_gen.networking.retries import BoxRetryStrategy +from box_sdk_gen.schemas.access_token import AccessToken if TYPE_CHECKING: from boxsdk import OAuth2 @@ -2009,3 +2024,316 @@ def get_ai_agent_default_config( session=self._session, response_object=box_response.json(), ) + + def get_authentication(self, *, token_storage=None): + """ + Extract authentication configuration from this legacy client and convert it + to a generated SDK Authentication object. + + This method supports the following legacy authentication types: + - DeveloperTokenAuth -> BoxDeveloperTokenAuth + - OAuth2 -> BoxOAuth + - JWTAuth -> BoxJWTAuth + - CCGAuth -> BoxCCGAuth + + :param token_storage: + Optional TokenStorage instance for the generated SDK. + If not provided, an adapter will be created to bridge legacy token storage. + :return: + Authentication object compatible with the generated SDK (box_sdk_gen). + :raises ValueError: + If the authentication type is not supported or required credentials are missing. + """ + oauth = self._oauth + + # Developer Token Authentication + if isinstance(oauth, DeveloperTokenAuth): + token = oauth.access_token + if not token: + raise ValueError("Developer token is not available") + return BoxDeveloperTokenAuth(token=token) + + # OAuth 2.0 Authentication + # Check if it's OAuth2 (but not DeveloperTokenAuth, JWTAuth, or CCGAuth) + if isinstance(oauth, LegacyOAuth2) and not isinstance( + oauth, (DeveloperTokenAuth, JWTAuth, CCGAuth) + ): + # It's OAuth2 + client_id = getattr(oauth, '_client_id', None) + client_secret = getattr(oauth, '_client_secret', None) + + if not client_id or not client_secret: + raise ValueError("OAuth2 client_id and client_secret are required") + + # Create token storage adapter if not provided + if token_storage is None: + # Create adapter from legacy OAuth2's token storage + def get_tokens(): + return oauth._get_tokens() + + def store_tokens(access_token, refresh_token): + oauth._store_tokens(access_token, refresh_token) + + token_storage = LegacyTokenStorageAdapter( + get_tokens=get_tokens, store_tokens=store_tokens + ) + + config = OAuthConfig( + client_id=client_id, + client_secret=client_secret, + token_storage=token_storage, + ) + + # Pre-populate with existing tokens if available + auth = BoxOAuth(config=config) + access_token, refresh_token = oauth._get_tokens() + if access_token: + existing_token = AccessToken( + access_token=access_token, + refresh_token=refresh_token, + expires_in=3600, # Default, actual expiry not available + token_type='bearer', + ) + token_storage.store(existing_token) + + return auth + + # JWT Authentication + if isinstance(oauth, JWTAuth): + client_id = getattr(oauth, '_client_id', None) + client_secret = getattr(oauth, '_client_secret', None) + jwt_key_id = getattr(oauth, '_jwt_key_id', None) + rsa_private_key = getattr(oauth, '_rsa_private_key', None) + enterprise_id = getattr(oauth, '_enterprise_id', None) + user_id = getattr(oauth, '_user_id', None) + + if not all([client_id, client_secret, jwt_key_id, rsa_private_key]): + raise ValueError( + "JWT authentication requires client_id, client_secret, jwt_key_id, and private key" + ) + + # Convert RSA private key to string format + # Note: If the key was originally encrypted, we can't extract the passphrase + # from the normalized RSAPrivateKey object. We'll serialize it unencrypted. + from cryptography.hazmat.primitives import serialization + + try: + # Serialize the key to PEM format (unencrypted) + # This works even if the original key was encrypted, as the + # normalized RSAPrivateKey object is already decrypted + private_key_pem = rsa_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode('utf-8') + # Passphrase is not needed since we're serializing unencrypted + # The generated SDK will handle encryption if needed + passphrase = '' + except Exception as e: + raise ValueError( + f"Cannot serialize private key: {e}. " + "Please ensure the private key is valid." + ) from e + + # Create token storage adapter if not provided + if token_storage is None: + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + token_storage = InMemoryTokenStorage() + + config = JWTConfig( + client_id=client_id, + client_secret=client_secret, + jwt_key_id=jwt_key_id, + private_key=private_key_pem, + private_key_passphrase=passphrase, + enterprise_id=enterprise_id, + user_id=user_id, + token_storage=token_storage, + ) + + auth = BoxJWTAuth(config=config) + + # Handle user vs enterprise scope + if user_id: + auth = auth.with_user_subject(user_id, token_storage=token_storage) + + return auth + + # CCG (Client Credentials Grant) Authentication + if isinstance(oauth, CCGAuth): + client_id = getattr(oauth, '_client_id', None) + client_secret = getattr(oauth, '_client_secret', None) + enterprise_id = getattr(oauth, '_enterprise_id', None) + user_id = getattr(oauth, '_user_id', None) + + if not client_id or not client_secret: + raise ValueError( + "CCG authentication requires client_id and client_secret" + ) + + # Create token storage adapter if not provided + if token_storage is None: + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + token_storage = InMemoryTokenStorage() + + config = CCGConfig( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + user_id=user_id, + token_storage=token_storage, + ) + + auth = BoxCCGAuth(config=config) + + # Handle user vs enterprise scope + if user_id: + auth = auth.with_user_subject(user_id, token_storage=token_storage) + + return auth + + raise ValueError( + f"Unsupported authentication type: {type(oauth).__name__}. " + "Supported types: DeveloperTokenAuth, OAuth2, JWTAuth, CCGAuth" + ) + + def get_network_session( + self, + *, + network_client=None, + retry_strategy=None, + data_sanitizer=None, + additional_headers=None, + ): + """ + Extract network configuration from this legacy client and convert it + to a generated SDK NetworkSession object. + + :param network_client: + Optional NetworkClient instance for the generated SDK. + If not provided, a default will be created with proxy support if configured. + :param retry_strategy: + Optional RetryStrategy instance for the generated SDK. + If not provided, one will be created from legacy retry settings. + :param data_sanitizer: + Optional DataSanitizer instance for the generated SDK. + :param additional_headers: + Optional dictionary of additional HTTP headers to merge with legacy headers. + :return: + NetworkSession object compatible with the generated SDK (box_sdk_gen). + """ + session = self._session + api_config = session.api_config + proxy_config = session.proxy_config + + # Extract base URLs + base_url = getattr(api_config, 'BASE_API_URL', 'https://api.box.com/2.0') + # Remove version suffix if present + if base_url.endswith('/2.0'): + base_url = base_url[:-4] + elif base_url.endswith('/2'): + base_url = base_url[:-2] + + upload_url = getattr(api_config, 'UPLOAD_URL', 'https://upload.box.com/api/2.0') + # Remove version suffix if present + if upload_url.endswith('/2.0'): + upload_url = upload_url[:-4] + elif upload_url.endswith('/2'): + upload_url = upload_url[:-2] + + oauth2_url = getattr( + api_config, 'OAUTH2_AUTHORIZE_URL', 'https://account.box.com/api/oauth2' + ) + # Extract base OAuth URL + if '/authorize' in oauth2_url: + oauth2_url = oauth2_url[: oauth2_url.rindex('/authorize')] + + base_urls = BaseUrls( + base_url=base_url, upload_url=upload_url, oauth_2_url=oauth2_url + ) + + # Extract or create retry strategy + if retry_strategy is None: + max_retries = getattr(api_config, 'MAX_RETRY_ATTEMPTS', 5) + retry_base_interval = getattr(session, '_retry_base_interval', 1.0) + retry_strategy = BoxRetryStrategy( + max_attempts=max_retries, retry_base_interval=retry_base_interval + ) + + # Handle proxy configuration + proxy_url = None + if proxy_config and hasattr(proxy_config, 'URL') and proxy_config.URL: + proxy_url = proxy_config.URL + # Handle authenticated proxy + if hasattr(proxy_config, 'AUTH') and proxy_config.AUTH: + auth = proxy_config.AUTH + if isinstance(auth, dict) and 'user' in auth and 'password' in auth: + scheme = ( + proxy_url.split('://', 1)[0] if '://' in proxy_url else 'http' + ) + # Extract host from URL + host = proxy_url.split('//')[1] if '//' in proxy_url else proxy_url + proxy_url = f"{scheme}://{auth['user']}:{auth['password']}@{host}" + + # Merge custom headers + headers = {} + if hasattr(session, '_default_headers'): + headers.update(session._default_headers.copy()) + if additional_headers: + headers.update(additional_headers) + + # Create network session + network_session = NetworkSession( + base_urls=base_urls, + network_client=network_client, + retry_strategy=retry_strategy, + additional_headers=headers if headers else None, + proxy_url=proxy_url, + data_sanitizer=data_sanitizer, + ) + + return network_session + + def get_sdk_gen_client(self, *, auth_options=None, network_options=None): + """ + Create a fully configured generated SDK client from this legacy client. + + This method combines get_authentication() and get_network_session() to create + a BoxClient instance that shares authentication and network configuration + with this legacy client. + + :param auth_options: + Optional dictionary with authentication options: + - token_storage: Custom TokenStorage instance + :param network_options: + Optional dictionary with network options: + - network_client: Custom NetworkClient instance + - retry_strategy: Custom RetryStrategy instance + - data_sanitizer: Custom DataSanitizer instance + - additional_headers: Dictionary of additional HTTP headers + :return: + BoxClient instance from box_sdk_gen, fully configured with shared settings. + """ + # Extract authentication + token_storage = None + if auth_options and 'token_storage' in auth_options: + token_storage = auth_options['token_storage'] + + auth = self.get_authentication(token_storage=token_storage) + + # Extract network session + network_kwargs = {} + if network_options: + network_kwargs = { + 'network_client': network_options.get('network_client'), + 'retry_strategy': network_options.get('retry_strategy'), + 'data_sanitizer': network_options.get('data_sanitizer'), + 'additional_headers': network_options.get('additional_headers'), + } + + network_session = self.get_network_session(**network_kwargs) + + # Create and return fully configured client + return BoxClient(auth=auth, network_session=network_session) diff --git a/boxsdk/util/token_storage_adapter.py b/boxsdk/util/token_storage_adapter.py new file mode 100644 index 000000000..65c6efbe4 --- /dev/null +++ b/boxsdk/util/token_storage_adapter.py @@ -0,0 +1,83 @@ +""" +Token storage adapter to bridge legacy OAuth2 token storage to generated SDK TokenStorage. + +This module provides adapters that allow legacy SDK token storage mechanisms +(callbacks, in-memory storage) to work with the generated SDK's TokenStorage interface. +""" + +from typing import Optional, Callable, Tuple +from box_sdk_gen.box.token_storage import TokenStorage +from box_sdk_gen.schemas.access_token import AccessToken + + +class LegacyTokenStorageAdapter(TokenStorage): + """ + Adapter that bridges legacy OAuth2 token storage (callbacks) to generated SDK TokenStorage. + + This adapter wraps legacy token storage mechanisms (store_tokens callback and + _get_tokens method) to provide a TokenStorage interface for the generated SDK. + """ + + def __init__( + self, + get_tokens: Callable[[], Tuple[Optional[str], Optional[str]]], + store_tokens: Optional[Callable[[str, Optional[str]], None]] = None, + ): + """ + Initialize the adapter with legacy token storage callbacks. + + :param get_tokens: + Callable that returns (access_token, refresh_token) tuple. + This can be a method from OAuth2._get_tokens or a custom callback. + :param store_tokens: + Optional callable that stores tokens. Takes (access_token, refresh_token). + If None, tokens will only be read from get_tokens but not persisted. + """ + self._get_tokens = get_tokens + self._store_tokens = store_tokens + + def store(self, token: AccessToken) -> None: + """ + Store a token using the legacy storage mechanism. + + :param token: + AccessToken object from generated SDK. + """ + if self._store_tokens is not None: + refresh_token = ( + token.refresh_token if hasattr(token, 'refresh_token') else None + ) + self._store_tokens(token.access_token, refresh_token) + + def get(self) -> Optional[AccessToken]: + """ + Get the current token from legacy storage. + + :return: + AccessToken object if tokens are available, None otherwise. + """ + access_token, refresh_token = self._get_tokens() + + if access_token is None: + return None + + # Convert legacy token format to generated SDK AccessToken + # The generated SDK AccessToken has: access_token, refresh_token, expires_in, token_type + # We don't have expires_in from legacy, so we'll set a default or calculate if possible + expires_in = 3600 # Default to 1 hour if not available + + return AccessToken( + access_token=access_token, + refresh_token=refresh_token, + expires_in=expires_in, + token_type='bearer', + ) + + def clear(self) -> None: + """ + Clear stored tokens using the legacy storage mechanism. + + Note: This will call store_tokens with None values if the callback supports it. + """ + if self._store_tokens is not None: + self._store_tokens(None, None) diff --git a/docs/config-sharing-implementation.md b/docs/config-sharing-implementation.md new file mode 100644 index 000000000..20e2d9947 --- /dev/null +++ b/docs/config-sharing-implementation.md @@ -0,0 +1,273 @@ +# Configuration Sharing Implementation + +This document describes the implementation of configuration sharing between the legacy `boxsdk` package and the new auto-generated `box_sdk_gen` package. + +## Overview + +The configuration sharing feature allows developers to seamlessly migrate from the legacy SDK to the generated SDK by automatically extracting and converting authentication and network configuration from a legacy client to a generated client. + +## Implementation Components + +### 1. Token Storage Adapter (`boxsdk/util/token_storage_adapter.py`) + +The `LegacyTokenStorageAdapter` class bridges the gap between legacy OAuth2 token storage mechanisms (callbacks) and the generated SDK's `TokenStorage` interface. + +**Key Features:** +- Converts legacy token format (access_token, refresh_token tuple) to generated SDK `AccessToken` objects +- Supports both read and write operations +- Handles token storage callbacks from legacy OAuth2 implementations + +### 2. Client Methods + +Three new methods have been added to the `Client` class in `boxsdk/client/client.py`: + +#### `get_authentication(token_storage=None)` + +Extracts authentication configuration from the legacy client and converts it to a generated SDK `Authentication` object. + +**Supported Authentication Types:** +- `DeveloperTokenAuth` → `BoxDeveloperTokenAuth` +- `OAuth2` → `BoxOAuth` +- `JWTAuth` → `BoxJWTAuth` +- `CCGAuth` → `BoxCCGAuth` + +**Parameters:** +- `token_storage` (optional): Custom `TokenStorage` instance. If not provided, an adapter will be created to bridge legacy token storage. + +**Returns:** +- `Authentication` object compatible with `box_sdk_gen` + +**Example:** +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +# Legacy client +legacy_auth = OAuth2(client_id="...", client_secret="...") +legacy_client = Client(legacy_auth) + +# Get generated SDK authentication +gen_auth = legacy_client.get_authentication() +``` + +#### `get_network_session(**options)` + +Extracts network configuration from the legacy client and converts it to a generated SDK `NetworkSession` object. + +**Parameters:** +- `network_client` (optional): Custom `NetworkClient` instance +- `retry_strategy` (optional): Custom `RetryStrategy` instance +- `data_sanitizer` (optional): Custom `DataSanitizer` instance +- `additional_headers` (optional): Dictionary of additional HTTP headers + +**Returns:** +- `NetworkSession` object compatible with `box_sdk_gen` + +**Configuration Mapping:** +- Base URLs: Extracted from `API` config +- Proxy settings: Extracted from `Proxy` config +- Retry strategy: Extracted from `API.MAX_RETRY_ATTEMPTS` and session retry settings +- Custom headers: Merged from session default headers and additional headers + +**Example:** +```python +network_session = legacy_client.get_network_session( + additional_headers={"X-Custom-Header": "value"} +) +``` + +#### `get_sdk_gen_client(auth_options=None, network_options=None)` + +Creates a fully configured generated SDK client from the legacy client. This is the main convenience method that combines `get_authentication()` and `get_network_session()`. + +**Parameters:** +- `auth_options` (optional): Dictionary with authentication options + - `token_storage`: Custom `TokenStorage` instance +- `network_options` (optional): Dictionary with network options + - `network_client`: Custom `NetworkClient` instance + - `retry_strategy`: Custom `RetryStrategy` instance + - `data_sanitizer`: Custom `DataSanitizer` instance + - `additional_headers`: Dictionary of additional HTTP headers + +**Returns:** +- `BoxClient` instance from `box_sdk_gen`, fully configured with shared settings + +**Example:** +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +# Legacy client setup +legacy_auth = OAuth2(client_id="...", client_secret="...") +legacy_client = Client(legacy_auth) + +# Get generated SDK client with one line +gen_client = legacy_client.get_sdk_gen_client() + +# Use generated client for new features +user = gen_client.users.get_user_by_id("me") +folders = gen_client.folders.get_folder_items("0") + +# Legacy client still works for existing code +legacy_user = legacy_client.user().get() +``` + +## Usage Examples + +### Developer Token + +```python +from boxsdk import Client +from boxsdk.auth import DeveloperTokenAuth + +legacy_auth = DeveloperTokenAuth() +legacy_client = Client(legacy_auth) + +# Get generated client +gen_client = legacy_client.get_sdk_gen_client() +``` + +### OAuth 2.0 with Token Refresh + +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +legacy_auth = OAuth2( + client_id="...", + client_secret="...", + access_token="...", + refresh_token="..." +) +legacy_client = Client(legacy_auth) + +# Get generated client with shared token storage +gen_client = legacy_client.get_sdk_gen_client() + +# Both clients share the same token storage +# Token refresh by either updates both +``` + +### JWT Authentication + +```python +from boxsdk import Client +from boxsdk.auth import JWTAuth + +legacy_auth = JWTAuth( + client_id="...", + client_secret="...", + enterprise_id="...", + jwt_key_id="...", + rsa_private_key_file_sys_path="path/to/key.pem" +) +legacy_client = Client(legacy_auth) + +# Get generated client +gen_client = legacy_client.get_sdk_gen_client() + +# Handle user vs enterprise scope +if user_id: + gen_auth = legacy_client.get_authentication() + gen_auth = gen_auth.with_user_subject(user_id) + gen_client = BoxClient(auth=gen_auth, network_session=legacy_client.get_network_session()) +``` + +### CCG Authentication + +```python +from boxsdk import Client +from boxsdk.auth import CCGAuth + +legacy_auth = CCGAuth( + client_id="...", + client_secret="...", + enterprise_id="..." +) +legacy_client = Client(legacy_auth) + +# Get generated client +gen_client = legacy_client.get_sdk_gen_client() +``` + +### Custom Network Configuration + +```python +from boxsdk import Client +from boxsdk.auth import OAuth2 + +legacy_client = Client(OAuth2(...)) + +# Get generated client with custom network options +gen_client = legacy_client.get_sdk_gen_client( + network_options={ + "additional_headers": {"X-Custom-Header": "value"}, + "retry_strategy": custom_retry_strategy + } +) +``` + +## Implementation Details + +### Token Storage Adapter + +The `LegacyTokenStorageAdapter` implements the `TokenStorage` interface from `box_sdk_gen`: + +- `store(token: AccessToken)`: Stores tokens using legacy storage mechanism +- `get() -> Optional[AccessToken]`: Retrieves tokens from legacy storage +- `clear()`: Clears stored tokens + +The adapter converts between: +- Legacy format: `(access_token: str, refresh_token: Optional[str])` +- Generated format: `AccessToken(access_token, refresh_token, expires_in, token_type)` + +### Authentication Conversion + +Each authentication type is handled specifically: + +1. **DeveloperTokenAuth**: Direct token extraction +2. **OAuth2**: Client ID/secret extraction + token storage adapter +3. **JWTAuth**: Full credential extraction including private key serialization +4. **CCGAuth**: Client ID/secret + enterprise/user ID extraction + +### Network Configuration Conversion + +Network settings are extracted from: +- `Session.api_config`: Base URLs, OAuth URLs +- `Session.proxy_config`: Proxy settings +- `Session._default_headers`: Custom headers +- `API.MAX_RETRY_ATTEMPTS`: Retry configuration + +## Limitations + +1. **JWT Private Key**: If the JWT private key was originally encrypted, the passphrase cannot be extracted from the normalized `RSAPrivateKey` object. The key will be serialized unencrypted. + +2. **Token Expiry**: The legacy SDK doesn't always track token expiry times, so the adapter uses a default value (3600 seconds) when converting to `AccessToken`. + +3. **Custom Token Storage**: If using custom token storage callbacks in the legacy SDK, ensure they're thread-safe if both clients will be used concurrently. + +## Error Handling + +The implementation raises: + +- `ValueError`: for unsupported auth types or missing credentials + +Note: Since `boxsdk` and `box_sdk_gen` are always installed together, import errors for `box_sdk_gen` should not occur in practice. + +## Testing + +Unit tests should cover: +- Token storage adapter conversion +- Each authentication type conversion +- Network configuration extraction +- Error cases (missing credentials, unsupported types) +- Integration tests for `get_sdk_gen_client()` + +## Future Enhancements + +Potential improvements: +1. Support for additional authentication types +2. Better token expiry tracking +3. Support for encrypted JWT keys with passphrase extraction +4. More comprehensive network configuration mapping + diff --git a/test/boxsdk/integration/test_config_sharing.py b/test/boxsdk/integration/test_config_sharing.py new file mode 100644 index 000000000..0414d0bb6 --- /dev/null +++ b/test/boxsdk/integration/test_config_sharing.py @@ -0,0 +1,339 @@ +""" +Integration tests for configuration sharing between legacy and generated SDKs. + +These tests verify that configuration sharing works end-to-end with real +authentication and network configurations. +""" + +from unittest.mock import Mock, MagicMock, patch +import pytest + +from boxsdk import Client +from boxsdk.auth.developer_token_auth import DeveloperTokenAuth +from boxsdk.auth.oauth2 import OAuth2 +from boxsdk.auth.jwt_auth import JWTAuth +from boxsdk.auth.ccg_auth import CCGAuth +from boxsdk.config import API, Proxy +from box_sdk_gen.client import BoxClient +from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.oauth import BoxOAuth +from box_sdk_gen.box.jwt_auth import BoxJWTAuth +from box_sdk_gen.box.ccg_auth import BoxCCGAuth +from box_sdk_gen.networking.network import NetworkSession + + +class TestConfigSharingIntegration: + """Integration tests for configuration sharing.""" + + def test_developer_token_roundtrip(self, mock_box_session): + """Test that developer token auth works end-to-end.""" + token = 'test_dev_token_12345' + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + legacy_client = Client(legacy_auth, session=mock_box_session) + + # Get generated client + gen_client = legacy_client.get_sdk_gen_client() + + # Verify both clients are configured + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxDeveloperTokenAuth) + assert gen_client.auth.token == token + + # Verify network session is configured + assert isinstance(gen_client.network_session, NetworkSession) + + def test_oauth2_roundtrip(self, mock_box_session): + """Test that OAuth2 auth works end-to-end with token sharing.""" + client_id = 'test_client_id_123' + client_secret = 'test_client_secret_456' + access_token = 'test_access_token_789' + refresh_token = 'test_refresh_token_012' + + legacy_auth = OAuth2( + client_id=client_id, + client_secret=client_secret, + access_token=access_token, + refresh_token=refresh_token, + ) + + legacy_client = Client(legacy_auth, session=mock_box_session) + + # Get generated client + gen_client = legacy_client.get_sdk_gen_client() + + # Verify authentication + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxOAuth) + assert gen_client.auth.config.client_id == client_id + assert gen_client.auth.config.client_secret == client_secret + + # Verify token storage is shared + stored_token = gen_client.auth.token_storage.get() + assert stored_token is not None + assert stored_token.access_token == access_token + assert stored_token.refresh_token == refresh_token + + def test_oauth2_token_synchronization(self, mock_box_session): + """Test that tokens are synchronized between legacy and generated clients.""" + client_id = 'test_client_id' + client_secret = 'test_client_secret' + + # Track token storage + stored_tokens = {'access': 'initial_token', 'refresh': 'initial_refresh'} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + legacy_auth = OAuth2( + client_id=client_id, client_secret=client_secret, store_tokens=store_tokens + ) + legacy_auth._get_tokens = get_tokens + legacy_auth._store_tokens = store_tokens + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Update token via generated client + from box_sdk_gen.schemas.access_token import AccessToken + + new_token = AccessToken( + access_token='new_token', + refresh_token='new_refresh', + expires_in=3600, + token_type='bearer', + ) + gen_client.auth.token_storage.store(new_token) + + # Verify legacy client sees the update + access, refresh = legacy_auth._get_tokens() + assert access == 'new_token' + assert refresh == 'new_refresh' + + def test_network_configuration_preservation(self, mock_box_session): + """Test that network configuration is preserved.""" + # Set up custom API config + original_base_url = API.BASE_API_URL + original_upload_url = API.UPLOAD_URL + + API.BASE_API_URL = 'https://custom.api.box.com/2.0' + API.UPLOAD_URL = 'https://custom.upload.box.com/api/2.0' + + try: + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify base URLs are preserved + assert 'custom.api.box.com' in gen_client.network_session.base_urls.base_url + assert ( + 'custom.upload.box.com' + in gen_client.network_session.base_urls.upload_url + ) + finally: + # Restore original values + API.BASE_API_URL = original_base_url + API.UPLOAD_URL = original_upload_url + + def test_proxy_configuration_preservation(self, mock_box_session): + """Test that proxy configuration is preserved.""" + # Set up proxy + original_proxy_url = Proxy.URL + original_proxy_auth = Proxy.AUTH + + Proxy.URL = 'http://proxy.example.com:8080' + Proxy.AUTH = {'user': 'proxy_user', 'password': 'proxy_pass'} + + try: + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify proxy is configured + assert gen_client.network_session.proxy_url is not None + assert 'proxy.example.com' in gen_client.network_session.proxy_url + assert 'proxy_user' in gen_client.network_session.proxy_url + finally: + # Restore original values + Proxy.URL = original_proxy_url + Proxy.AUTH = original_proxy_auth + + def test_custom_headers_preservation(self, mock_box_session): + """Test that custom headers are preserved.""" + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + # Add custom headers to session + mock_box_session._default_headers = {'X-Custom-Header': 'custom_value'} + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify headers are preserved + assert 'X-Custom-Header' in gen_client.network_session.additional_headers + assert ( + gen_client.network_session.additional_headers['X-Custom-Header'] + == 'custom_value' + ) + + def test_retry_strategy_preservation(self, mock_box_session): + """Test that retry strategy is preserved.""" + # Modify API config + original_max_retries = API.MAX_RETRY_ATTEMPTS + API.MAX_RETRY_ATTEMPTS = 10 + + try: + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + gen_client = legacy_client.get_sdk_gen_client() + + # Verify retry strategy + assert gen_client.network_session.retry_strategy.max_attempts == 10 + finally: + API.MAX_RETRY_ATTEMPTS = original_max_retries + + def test_jwt_auth_roundtrip(self, mock_box_session): + """Test that JWT auth works end-to-end.""" + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + + # Create a mock RSA private key + mock_key = Mock(spec=RSAPrivateKey) + mock_key.private_bytes.return_value = ( + b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + ) + + client_id = 'test_jwt_client_id' + client_secret = 'test_jwt_client_secret' + jwt_key_id = 'test_jwt_key_id' + enterprise_id = 'test_enterprise_id' + + legacy_auth = JWTAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + jwt_key_id=jwt_key_id, + rsa_private_key_data=mock_key, + ) + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Verify authentication + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxJWTAuth) + assert gen_client.auth.config.client_id == client_id + assert gen_client.auth.config.client_secret == client_secret + assert gen_client.auth.config.jwt_key_id == jwt_key_id + assert gen_client.auth.config.enterprise_id == enterprise_id + + def test_ccg_auth_roundtrip(self, mock_box_session): + """Test that CCG auth works end-to-end.""" + client_id = 'test_ccg_client_id' + client_secret = 'test_ccg_client_secret' + enterprise_id = 'test_ccg_enterprise_id' + + legacy_auth = CCGAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + ) + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Verify authentication + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxCCGAuth) + assert gen_client.auth.config.client_id == client_id + assert gen_client.auth.config.client_secret == client_secret + assert gen_client.auth.config.enterprise_id == enterprise_id + + def test_parallel_client_usage(self, mock_box_session): + """Test that both legacy and generated clients can be used in parallel.""" + token = 'parallel_token' + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client() + + # Both clients should be functional + assert legacy_client.auth.access_token == token + assert gen_client.auth.token == token + + # Both should have network sessions + assert legacy_client.session is not None + assert gen_client.network_session is not None + + def test_get_authentication_with_custom_token_storage(self, mock_box_session): + """Test get_authentication() with custom token storage.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + legacy_auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh', + ) + + custom_storage = InMemoryTokenStorage() + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_auth = legacy_client.get_authentication(token_storage=custom_storage) + + assert isinstance(gen_auth, BoxOAuth) + assert gen_auth.config.token_storage is custom_storage + + def test_get_network_session_with_custom_options(self, mock_box_session): + """Test get_network_session() with custom options.""" + from box_sdk_gen.networking.retries import BoxRetryStrategy + + legacy_auth = DeveloperTokenAuth(get_new_token_callback=lambda: 'token') + legacy_client = Client(legacy_auth, session=mock_box_session) + + custom_retry = BoxRetryStrategy(max_attempts=20, retry_base_interval=2.5) + custom_headers = {'X-Custom': 'value'} + + network_session = legacy_client.get_network_session( + retry_strategy=custom_retry, additional_headers=custom_headers + ) + + assert network_session.retry_strategy.max_attempts == 20 + assert network_session.retry_strategy.retry_base_interval == 2.5 + assert network_session.additional_headers['X-Custom'] == 'value' + + def test_get_sdk_gen_client_with_all_options(self, mock_box_session): + """Test get_sdk_gen_client() with all options.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + from box_sdk_gen.networking.retries import BoxRetryStrategy + + legacy_auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh', + ) + + custom_storage = InMemoryTokenStorage() + custom_retry = BoxRetryStrategy(max_attempts=25) + custom_headers = {'X-Full-Test': 'full_value'} + + legacy_client = Client(legacy_auth, session=mock_box_session) + gen_client = legacy_client.get_sdk_gen_client( + auth_options={'token_storage': custom_storage}, + network_options={ + 'retry_strategy': custom_retry, + 'additional_headers': custom_headers, + }, + ) + + # Verify all options are applied + assert gen_client.auth.config.token_storage is custom_storage + assert gen_client.network_session.retry_strategy.max_attempts == 25 + assert ( + gen_client.network_session.additional_headers['X-Full-Test'] == 'full_value' + ) diff --git a/test/boxsdk/unit/client/test_config_sharing.py b/test/boxsdk/unit/client/test_config_sharing.py new file mode 100644 index 000000000..23a37c3fe --- /dev/null +++ b/test/boxsdk/unit/client/test_config_sharing.py @@ -0,0 +1,395 @@ +""" +Unit tests for configuration sharing methods in Client class. + +Tests get_authentication(), get_network_session(), and get_sdk_gen_client() methods. +""" + +from unittest.mock import Mock +import pytest + +from boxsdk.client import Client +from boxsdk.auth.developer_token_auth import DeveloperTokenAuth +from boxsdk.auth.oauth2 import OAuth2 +from boxsdk.auth.jwt_auth import JWTAuth +from boxsdk.auth.ccg_auth import CCGAuth +from boxsdk.config import API, Proxy +from boxsdk.session.session import Session, AuthorizedSession +from box_sdk_gen.box.developer_token_auth import BoxDeveloperTokenAuth +from box_sdk_gen.box.oauth import BoxOAuth +from box_sdk_gen.box.jwt_auth import BoxJWTAuth +from box_sdk_gen.box.ccg_auth import BoxCCGAuth +from box_sdk_gen.networking.network import NetworkSession +from box_sdk_gen.networking.base_urls import BaseUrls +from box_sdk_gen.client import BoxClient + + +class TestGetAuthentication: + """Test cases for get_authentication() method.""" + + def test_get_authentication_developer_token(self, mock_box_session): + """Test converting DeveloperTokenAuth to BoxDeveloperTokenAuth.""" + token = 'dev_token_123' + auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxDeveloperTokenAuth) + assert gen_auth.token == token + + def test_get_authentication_developer_token_missing_token(self, mock_box_session): + """Test that missing developer token raises ValueError.""" + # Create auth with callback that returns None + auth = DeveloperTokenAuth(get_new_token_callback=lambda: None) + auth._access_token = None + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="Developer token is not available"): + client.get_authentication() + + def test_get_authentication_oauth2(self, mock_box_session): + """Test converting OAuth2 to BoxOAuth.""" + client_id = 'test_client_id' + client_secret = 'test_client_secret' + access_token = 'test_access_token' + refresh_token = 'test_refresh_token' + + auth = OAuth2( + client_id=client_id, + client_secret=client_secret, + access_token=access_token, + refresh_token=refresh_token, + ) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxOAuth) + assert gen_auth.config.client_id == client_id + assert gen_auth.config.client_secret == client_secret + + def test_get_authentication_oauth2_missing_credentials(self, mock_box_session): + """Test that missing OAuth2 credentials raises ValueError.""" + auth = OAuth2(client_id=None, client_secret=None) + + client = Client(auth, session=mock_box_session) + + with pytest.raises( + ValueError, match="OAuth2 client_id and client_secret are required" + ): + client.get_authentication() + + def test_get_authentication_oauth2_with_custom_token_storage( + self, mock_box_session + ): + """Test OAuth2 conversion with custom token storage.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh', + ) + + custom_storage = InMemoryTokenStorage() + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication(token_storage=custom_storage) + + assert isinstance(gen_auth, BoxOAuth) + assert gen_auth.config.token_storage is custom_storage + + def test_get_authentication_jwt(self, mock_box_session): + """Test converting JWTAuth to BoxJWTAuth.""" + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + from cryptography.hazmat.primitives import serialization as crypto_serialization + + # Create a mock RSA private key + mock_key = Mock(spec=RSAPrivateKey) + mock_key.private_bytes.return_value = ( + b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + ) + + client_id = 'test_client_id' + client_secret = 'test_client_secret' + jwt_key_id = 'test_key_id' + enterprise_id = 'test_enterprise_id' + + auth = JWTAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + jwt_key_id=jwt_key_id, + rsa_private_key_data=mock_key, + ) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxJWTAuth) + assert gen_auth.config.client_id == client_id + assert gen_auth.config.client_secret == client_secret + assert gen_auth.config.jwt_key_id == jwt_key_id + assert gen_auth.config.enterprise_id == enterprise_id + + def test_get_authentication_jwt_missing_credentials(self, mock_box_session): + """Test that missing JWT credentials raises ValueError.""" + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + + # Create a mock RSA private key + mock_key = Mock(spec=RSAPrivateKey) + mock_key.private_bytes.return_value = ( + b'-----BEGIN PRIVATE KEY-----\nMOCK_KEY\n-----END PRIVATE KEY-----' + ) + + # Create auth with missing client_id + auth = JWTAuth( + client_id=None, # Missing required field + client_secret='test_secret', + enterprise_id='test_enterprise', + jwt_key_id='test_key_id', + rsa_private_key_data=mock_key, + ) + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="JWT authentication requires"): + client.get_authentication() + + def test_get_authentication_ccg(self, mock_box_session): + """Test converting CCGAuth to BoxCCGAuth.""" + client_id = 'test_client_id' + client_secret = 'test_client_secret' + enterprise_id = 'test_enterprise_id' + + auth = CCGAuth( + client_id=client_id, + client_secret=client_secret, + enterprise_id=enterprise_id, + ) + + client = Client(auth, session=mock_box_session) + gen_auth = client.get_authentication() + + assert isinstance(gen_auth, BoxCCGAuth) + assert gen_auth.config.client_id == client_id + assert gen_auth.config.client_secret == client_secret + assert gen_auth.config.enterprise_id == enterprise_id + + def test_get_authentication_ccg_missing_credentials(self, mock_box_session): + """Test that missing CCG credentials raises ValueError.""" + auth = CCGAuth(client_id=None, client_secret=None) + + client = Client(auth, session=mock_box_session) + + with pytest.raises(ValueError, match="CCG authentication requires"): + client.get_authentication() + + def test_get_authentication_unsupported_type(self, mock_box_session): + """Test that unsupported auth type raises ValueError.""" + # Create a mock auth that's not one of the supported types + mock_auth = Mock() + mock_auth.__class__.__name__ = 'UnsupportedAuth' + + client = Client(mock_auth, session=mock_box_session) + + with pytest.raises(ValueError, match="Unsupported authentication type"): + client.get_authentication() + + +class TestGetNetworkSession: + """Test cases for get_network_session() method.""" + + def test_get_network_session_default(self, mock_box_session): + """Test extracting network session with default settings.""" + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + assert isinstance(network_session, NetworkSession) + assert isinstance(network_session.base_urls, BaseUrls) + + def test_get_network_session_with_custom_headers(self, mock_box_session): + """Test network session with custom headers.""" + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + additional_headers = {'X-Custom-Header': 'custom_value'} + network_session = client.get_network_session( + additional_headers=additional_headers + ) + + assert 'X-Custom-Header' in network_session.additional_headers + assert network_session.additional_headers['X-Custom-Header'] == 'custom_value' + + def test_get_network_session_with_proxy(self, mock_box_session): + """Test network session with proxy configuration.""" + # Set up proxy config + Proxy.URL = 'http://proxy.example.com:8080' + Proxy.AUTH = None + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + # Proxy URL should be set + assert network_session.proxy_url is not None + assert 'proxy.example.com' in network_session.proxy_url + + # Clean up + Proxy.URL = None + + def test_get_network_session_with_authenticated_proxy(self, mock_box_session): + """Test network session with authenticated proxy.""" + # Set up authenticated proxy config + Proxy.URL = 'http://proxy.example.com:8080' + Proxy.AUTH = {'user': 'proxy_user', 'password': 'proxy_pass'} + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + # Proxy URL should include authentication + assert network_session.proxy_url is not None + assert 'proxy_user' in network_session.proxy_url + assert 'proxy_pass' in network_session.proxy_url + + # Clean up + Proxy.URL = None + Proxy.AUTH = None + + def test_get_network_session_with_custom_retry_strategy(self, mock_box_session): + """Test network session with custom retry strategy.""" + from box_sdk_gen.networking.retries import BoxRetryStrategy + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + custom_retry = BoxRetryStrategy(max_attempts=10, retry_base_interval=2.0) + network_session = client.get_network_session(retry_strategy=custom_retry) + + assert network_session.retry_strategy is custom_retry + assert network_session.retry_strategy.max_attempts == 10 + + def test_get_network_session_base_urls(self, mock_box_session): + """Test that base URLs are correctly extracted.""" + # Modify API config + original_base_url = API.BASE_API_URL + original_upload_url = API.UPLOAD_URL + + API.BASE_API_URL = 'https://custom.api.box.com/2.0' + API.UPLOAD_URL = 'https://custom.upload.box.com/api/2.0' + + auth = Mock(OAuth2) + client = Client(auth, session=mock_box_session) + + network_session = client.get_network_session() + + # URLs should have version suffix removed + assert 'custom.api.box.com' in network_session.base_urls.base_url + assert 'custom.upload.box.com' in network_session.base_urls.upload_url + + # Restore + API.BASE_API_URL = original_base_url + API.UPLOAD_URL = original_upload_url + + +class TestGetSdkGenClient: + """Test cases for get_sdk_gen_client() method.""" + + def test_get_sdk_gen_client_basic(self, mock_box_session): + """Test basic usage of get_sdk_gen_client().""" + token = 'dev_token' + auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client() + + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxDeveloperTokenAuth) + assert gen_client.auth.token == token + + def test_get_sdk_gen_client_oauth2(self, mock_box_session): + """Test get_sdk_gen_client() with OAuth2.""" + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh', + ) + + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client() + + assert isinstance(gen_client, BoxClient) + assert isinstance(gen_client.auth, BoxOAuth) + + def test_get_sdk_gen_client_with_auth_options(self, mock_box_session): + """Test get_sdk_gen_client() with custom auth options.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh', + ) + + custom_storage = InMemoryTokenStorage() + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client( + auth_options={'token_storage': custom_storage} + ) + + assert isinstance(gen_client, BoxClient) + assert gen_client.auth.config.token_storage is custom_storage + + def test_get_sdk_gen_client_with_network_options(self, mock_box_session): + """Test get_sdk_gen_client() with custom network options.""" + from box_sdk_gen.networking.retries import BoxRetryStrategy + + token = 'dev_token' + auth = DeveloperTokenAuth(get_new_token_callback=lambda: token) + + custom_retry = BoxRetryStrategy(max_attempts=10) + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client( + network_options={'retry_strategy': custom_retry} + ) + + assert isinstance(gen_client, BoxClient) + assert gen_client.network_session.retry_strategy.max_attempts == 10 + + def test_get_sdk_gen_client_with_all_options(self, mock_box_session): + """Test get_sdk_gen_client() with both auth and network options.""" + from box_sdk_gen.box.token_storage import InMemoryTokenStorage + from box_sdk_gen.networking.retries import BoxRetryStrategy + + auth = OAuth2( + client_id='test_id', + client_secret='test_secret', + access_token='token', + refresh_token='refresh', + ) + + custom_storage = InMemoryTokenStorage() + custom_retry = BoxRetryStrategy(max_attempts=15) + additional_headers = {'X-Test': 'value'} + + client = Client(auth, session=mock_box_session) + gen_client = client.get_sdk_gen_client( + auth_options={'token_storage': custom_storage}, + network_options={ + 'retry_strategy': custom_retry, + 'additional_headers': additional_headers, + }, + ) + + assert isinstance(gen_client, BoxClient) + assert gen_client.auth.config.token_storage is custom_storage + assert gen_client.network_session.retry_strategy.max_attempts == 15 + assert gen_client.network_session.additional_headers['X-Test'] == 'value' diff --git a/test/boxsdk/unit/util/test_token_storage_adapter.py b/test/boxsdk/unit/util/test_token_storage_adapter.py new file mode 100644 index 000000000..71d4db611 --- /dev/null +++ b/test/boxsdk/unit/util/test_token_storage_adapter.py @@ -0,0 +1,183 @@ +""" +Unit tests for token storage adapter. + +Tests the LegacyTokenStorageAdapter that bridges legacy OAuth2 token storage +to generated SDK TokenStorage interface. +""" + +from unittest.mock import Mock, MagicMock +import pytest + +from boxsdk.util.token_storage_adapter import LegacyTokenStorageAdapter +from box_sdk_gen.schemas.access_token import AccessToken + + +class TestLegacyTokenStorageAdapter: + """Test cases for LegacyTokenStorageAdapter.""" + + def test_store_with_store_callback(self): + """Test storing tokens using the legacy store callback.""" + stored_tokens = {} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, store_tokens=store_tokens + ) + + token = AccessToken( + access_token='test_access_token', + refresh_token='test_refresh_token', + expires_in=3600, + token_type='bearer', + ) + + adapter.store(token) + + assert stored_tokens['access'] == 'test_access_token' + assert stored_tokens['refresh'] == 'test_refresh_token' + + def test_store_without_store_callback(self): + """Test that store works even without a store callback.""" + + def get_tokens(): + return (None, None) + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + + token = AccessToken( + access_token='test_access_token', + refresh_token='test_refresh_token', + expires_in=3600, + token_type='bearer', + ) + + # Should not raise an error + adapter.store(token) + + def test_get_with_tokens(self): + """Test retrieving tokens when they exist.""" + + def get_tokens(): + return ('test_access_token', 'test_refresh_token') + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + + token = adapter.get() + + assert token is not None + assert token.access_token == 'test_access_token' + assert token.refresh_token == 'test_refresh_token' + assert token.expires_in == 3600 # Default value + assert token.token_type == 'bearer' + + def test_get_without_tokens(self): + """Test retrieving tokens when they don't exist.""" + + def get_tokens(): + return (None, None) + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + + token = adapter.get() + + assert token is None + + def test_get_with_only_access_token(self): + """Test retrieving when only access token is available.""" + + def get_tokens(): + return ('test_access_token', None) + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + + token = adapter.get() + + assert token is not None + assert token.access_token == 'test_access_token' + assert token.refresh_token is None + + def test_clear_with_store_callback(self): + """Test clearing tokens using the legacy store callback.""" + stored_tokens = {'access': 'token', 'refresh': 'refresh_token'} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + if access_token is None: + stored_tokens.clear() + else: + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, store_tokens=store_tokens + ) + + adapter.clear() + + assert stored_tokens == {} + + def test_clear_without_store_callback(self): + """Test that clear works even without a store callback.""" + + def get_tokens(): + return ('token', 'refresh_token') + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + + # Should not raise an error + adapter.clear() + + def test_token_conversion_format(self): + """Test that tokens are converted to the correct AccessToken format.""" + + def get_tokens(): + return ('access_123', 'refresh_456') + + adapter = LegacyTokenStorageAdapter(get_tokens=get_tokens, store_tokens=None) + + token = adapter.get() + + assert isinstance(token, AccessToken) + assert token.access_token == 'access_123' + assert token.refresh_token == 'refresh_456' + assert token.expires_in == 3600 + assert token.token_type == 'bearer' + + def test_store_and_get_roundtrip(self): + """Test storing and retrieving tokens in a roundtrip.""" + stored_tokens = {} + + def get_tokens(): + return (stored_tokens.get('access'), stored_tokens.get('refresh')) + + def store_tokens(access_token, refresh_token): + stored_tokens['access'] = access_token + stored_tokens['refresh'] = refresh_token + + adapter = LegacyTokenStorageAdapter( + get_tokens=get_tokens, store_tokens=store_tokens + ) + + # Store a token + original_token = AccessToken( + access_token='stored_access', + refresh_token='stored_refresh', + expires_in=7200, + token_type='bearer', + ) + adapter.store(original_token) + + # Retrieve it + retrieved_token = adapter.get() + + assert retrieved_token is not None + assert retrieved_token.access_token == 'stored_access' + assert retrieved_token.refresh_token == 'stored_refresh'