Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ datafusion-expr = { version = "50.0.0" }
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "439cbd2282504c3ffaf262f1ffdb530a0fb1a151" }
datafusion-macros = { version = "50.0.0" }
datafusion-physical-plan = { version = "50.0.0" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f76b73573267304d2f866ed0dba91587f928caa6" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "a5fb43869d8c09d488343c24ea18db01752fb1bb" }
futures = { version = "0.3" }
http = "1.2"
http-body-util = "0.1.0"
iceberg = { git = "https://github.com/apache/iceberg-rust.git", rev="7a5ad1fcaf00d4638857812bab788105f6c60573"}
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f76b73573267304d2f866ed0dba91587f928caa6" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f76b73573267304d2f866ed0dba91587f928caa6" }
iceberg-rust-spec = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f76b73573267304d2f866ed0dba91587f928caa6" }
iceberg-s3tables-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f76b73573267304d2f866ed0dba91587f928caa6" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "a5fb43869d8c09d488343c24ea18db01752fb1bb" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "a5fb43869d8c09d488343c24ea18db01752fb1bb" }
iceberg-rust-spec = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "a5fb43869d8c09d488343c24ea18db01752fb1bb" }
iceberg-s3tables-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "a5fb43869d8c09d488343c24ea18db01752fb1bb" }
indexmap = "2.7.1"
jsonwebtoken = "9.3.1"
lazy_static = { version = "1.5" }
Expand Down
2 changes: 2 additions & 0 deletions crates/catalog-metastore/src/metastore_bootstrap_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl MetastoreBootstrapConfig {
pub async fn load_from_env() -> Result<Self, ConfigError> {
let mut config = Self::default();
if let Some(volume) = load_volume_from_env().await? {
tracing::info!("Loading volume from environment");
config.volumes.push(volume);
}
Ok(config)
Expand Down Expand Up @@ -437,6 +438,7 @@ async fn load_volume_from_env() -> Result<Option<VolumeEntry>, ConfigError> {
.await?;

validate_s3tables_credentials(&arn, &credentials).await?;
tracing::info!("Loaded volume has been validated");

VolumeType::S3Tables(S3TablesVolume {
endpoint: None,
Expand Down
4 changes: 4 additions & 0 deletions crates/state-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ AWS_REGION=us-east-2 AWS_ACCESS_KEY_ID=local AWS_SECRET_ACCESS_KEY=local dynamod
```bash
STATESTORE_TABLE_NAME=embucket-statestore
STATESTORE_DYNAMODB_ENDPOINT=http://localhost:8000
AWS_DDB_ACCESS_KEY_ID=key
AWS_DDB_SECRET_ACCESS_KEY=secret
# For temporary credentials
AWS_DDB_SESSION_TOKEN=token
```
32 changes: 25 additions & 7 deletions crates/state-store/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::error::{DynamoDbCredentialsSnafu, Error, Result};
use aws_config::Region;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::{BehaviorVersion, defaults};
use aws_credential_types::Credentials;
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use aws_sdk_dynamodb::Client;
use aws_sdk_dynamodb::config::Builder as DynamoConfigBuilder;
use aws_sdk_dynamodb::config::retry::RetryConfig;
use snafu::ResultExt;
use std::env;

use crate::error::{Error, Result};

#[derive(Debug, Clone)]
pub struct DynamoDbConfig {
pub table_name: String,
Expand Down Expand Up @@ -40,11 +41,28 @@ impl DynamoDbConfig {
loader = loader.endpoint_url(endpoint);
}

let access_key = required_env("AWS_ACCESS_KEY_ID")?;
let secret_key = required_env("AWS_SECRET_ACCESS_KEY")?;
let creds = Credentials::from_keys(access_key, secret_key, None);
loader = loader.credentials_provider(SharedCredentialsProvider::new(creds));
let creds = if let (Ok(access_key), Ok(secret_key)) = (
env::var("AWS_DDB_ACCESS_KEY_ID"),
env::var("AWS_DDB_SECRET_ACCESS_KEY"),
) {
let token = env::var("AWS_DDB_SESSION_TOKEN").ok();
Credentials::from_keys(access_key, secret_key, token)
} else {
// Default AWS Credential Provider Chain
// Resolution order:
// 1. Environment variables
// 2. Shared config (`~/.aws/config`, `~/.aws/credentials`)
// 3. Web Identity Tokens
// 4. ECS (IAM Roles for Tasks) & General HTTP credentials
// 5. EC2 IMDSv2
let provider = CredentialsProviderChain::default_provider().await;
provider
.provide_credentials()
.await
.context(DynamoDbCredentialsSnafu)?
};

loader = loader.credentials_provider(SharedCredentialsProvider::new(creds));
let config = loader.load().await;
let retry_config = RetryConfig::adaptive();
let config_builder = DynamoConfigBuilder::from(&config).retry_config(retry_config);
Expand Down
7 changes: 7 additions & 0 deletions crates/state-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Dynamodb query error: {error}"))]
DynamoDbCredentialsError {
#[snafu(source(from(aws_credential_types::provider::error::CredentialsError, Box::new)))]
error: Box<aws_credential_types::provider::error::CredentialsError>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("item not found"))]
NotFound,
#[snafu(display("data attribute missing from DynamoDB item"))]
Expand Down