Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 5 additions & 28 deletions .github/workflows/job-benchmark-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,11 @@ jobs:
- name: "Execute benchmarks"
id: init_comment
run: |
# Run all benchmarks in parallel
composer test:benchmark:extractor -- --ref=1.x --progress=none --iterations=1 > ./var/phpbench/extractor.txt 2>&1 &
PID_EXTRACTOR=$!

composer test:benchmark:transformer -- --ref=1.x --progress=none --iterations=1 > ./var/phpbench/transformer.txt 2>&1 &
PID_TRANSFORMER=$!

composer test:benchmark:loader -- --ref=1.x --progress=none --iterations=1 > ./var/phpbench/loader.txt 2>&1 &
PID_LOADER=$!

composer test:benchmark:building_blocks -- --ref=1.x --progress=none --iterations=1 > ./var/phpbench/building_blocks.txt 2>&1 &
PID_BUILDING=$!

composer test:benchmark:parquet-library -- --ref=1.x --progress=none --iterations=1 > ./var/phpbench/parquet.txt 2>&1 &
PID_PARQUET=$!

# Wait for all to complete and capture exit codes
EXIT_CODE=0
wait $PID_EXTRACTOR || EXIT_CODE=$?
wait $PID_TRANSFORMER || EXIT_CODE=$?
wait $PID_LOADER || EXIT_CODE=$?
wait $PID_BUILDING || EXIT_CODE=$?
wait $PID_PARQUET || EXIT_CODE=$?

if [ $EXIT_CODE -ne 0 ]; then
echo "One or more benchmarks failed with exit code $EXIT_CODE"
exit $EXIT_CODE
fi
composer test:benchmark:extractor -- --ref=1.x --progress=none > ./var/phpbench/extractor.txt
composer test:benchmark:transformer -- --ref=1.x --progress=none > ./var/phpbench/transformer.txt
composer test:benchmark:loader -- --ref=1.x --progress=none > ./var/phpbench/loader.txt
composer test:benchmark:building_blocks -- --ref=1.x --progress=none > ./var/phpbench/building_blocks.txt
composer test:benchmark:parquet-library -- --ref=1.x --progress=none > ./var/phpbench/parquet.txt

# Build the summary file
{
Expand Down
28 changes: 4 additions & 24 deletions documentation/components/adapters/doctrine.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,13 @@ data_frame()
// Types are automatically detected from the Flow Schema
```

#### Manual Type Override
#### Custom Types Map

You can override specific column types for fine-grained control:
For advanced scenarios, you can provide a custom type mapping to control how Flow types are converted to DBAL types:

```php
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;

data_frame()
->read(from_())
->write(to_dbal_table_insert($connection, 'users')
->withColumnTypes([
'id' => Type::getType(Types::INTEGER),
'email' => Type::getType(Types::STRING),
'created_at' => Type::getType(Types::DATETIME_IMMUTABLE),
]))
->run();
```

#### Custom Type Detector

For advanced scenarios, you can provide a custom type detector with your own type mapping:

```php
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use Flow\ETL\Adapter\Doctrine\{DbalTypesDetector, TypesMap};
use Flow\ETL\Adapter\Doctrine\TypesMap;
use Flow\Types\Type\Native\StringType;
use Doctrine\DBAL\Types\TextType;

Expand All @@ -117,7 +97,7 @@ $customTypesMap = new TypesMap([
data_frame()
->read(from_())
->write(to_dbal_table_insert($connection, 'users')
->withTypesDetector(new DbalTypesDetector($customTypesMap)))
->withTypesMap($customTypesMap))
->run();
```

Expand Down
6 changes: 3 additions & 3 deletions phpbench.json.dist
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
"src/core/etl/tests/Flow/ETL/Tests/Benchmark/",
"src/lib/parquet/tests/Flow/Parquet/Tests/Benchmark/"
],
"runner.php_config": { "memory_limit": "1G" },
"runner.php_config": { "memory_limit": "2G" },
"runner.php_env": {
"PGSQL_DATABASE_URL": "pgsql://postgres:postgres@127.0.0.1:5432/postgres?serverVersion=11&charset=utf8"
},
"runner.iterations": 3,
"runner.retry_threshold": 5,
"runner.iterations": 2,
"runner.retry_threshold": 10,
"storage.xml_storage_path": "var/phpbench"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

namespace Flow\ETL\Adapter\CSV\Tests\Benchmark;

use function Flow\ETL\Adapter\CSV\{from_csv, to_csv};
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\{config, flow_context};
use Flow\ETL\{FlowContext, Rows};
use Flow\ETL\{FlowContext, Rows, Tests\Double\FakeStaticOrdersExtractor};
use PhpBench\Attributes\Groups;

#[Groups(['loader'])]
Expand All @@ -22,11 +22,7 @@ public function __construct()
{
$this->context = flow_context(config());
$this->outputPath = \tempnam(\sys_get_temp_dir(), 'etl_csv_loader_bench') . '.csv';
$this->rows = \Flow\ETL\DSL\rows();

foreach (from_csv(__DIR__ . '/Fixtures/orders_flow.csv')->extract($this->context) as $rows) {
$this->rows = $this->rows->merge($rows);
}
$this->rows = (new FakeStaticOrdersExtractor(10_000))->toRows();
}

public function __destruct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Flow\ETL\Adapter\Doctrine;

use Doctrine\DBAL\{Connection, DriverManager};
use Doctrine\DBAL\Types\Type;
use Flow\Doctrine\Bulk\{Bulk, BulkData, InsertOptions, UpdateOptions};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\{FlowContext, Loader, Rows};
Expand All @@ -14,18 +13,13 @@ final class DbalLoader implements Loader
{
private ?Bulk $bulk = null;

/**
* @var null|array<string, Type>
*/
private ?array $columnTypes = null;

private ?Connection $connection = null;

private string $operation = 'insert';

private InsertOptions|UpdateOptions|null $operationOptions = null;

private ?DbalTypesDetector $typesDetector = null;
private ?TypesMap $typesMap = null;

/**
* @param array<string, mixed> $connectionParams
Expand Down Expand Up @@ -65,28 +59,21 @@ public static function fromConnection(

public function load(Rows $rows, FlowContext $context) : void
{
$normalizedData = (new RowsNormalizer())->normalize($rows->sortEntries());
if ($rows->count() === 0) {
return;
}

$sortedRows = $rows->sortEntries();
$normalizedData = (new RowsNormalizer())->normalize($sortedRows);

$this->bulk()->{$this->operation}(
$this->connection(),
$this->tableName,
new BulkData($normalizedData, $this->typesDetector()->convert($rows->schema(), $this->columnTypes ?? [])),
new BulkData($normalizedData, $this->typesMap()->flowRowTypes($sortedRows->first())),
$this->operationOptions
);
}

/**
* Override types taken from Flow Schema with explicitly provided DBAL types.
*
* @param array<string, Type> $types Column name => DBAL Type instance
*/
public function withColumnTypes(array $types) : self
{
$this->columnTypes = $types;

return $this;
}

/**
* @throws InvalidArgumentException
*/
Expand All @@ -109,11 +96,11 @@ public function withOperationOptions(InsertOptions|UpdateOptions|null $operation
}

/**
* Set custom SchemaToTypesConverter with custom TypesMap.
* Set custom types map for Flow Type to DBAL Type conversion.
*/
public function withTypesDetector(DbalTypesDetector $detector) : self
public function withTypesMap(TypesMap $typesMap) : self
{
$this->typesDetector = $detector;
$this->typesMap = $typesMap;

return $this;
}
Expand All @@ -137,14 +124,8 @@ private function connection() : Connection
return $this->connection;
}

private function typesDetector() : DbalTypesDetector
private function typesMap() : TypesMap
{
if ($this->typesDetector !== null) {
return $this->typesDetector;
}

$this->typesDetector = new DbalTypesDetector();

return $this->typesDetector;
return $this->typesMap ??= new TypesMap([]);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Doctrine\DBAL\Types\{BigIntType, BlobType, DateImmutableType, DateTimeImmutableType, DateTimeTzImmutableType, DateTimeTzType, DecimalType, GuidType, SmallFloatType, SmallIntType, TextType, TimeImmutableType};
use Doctrine\DBAL\Types\Type as DbalType;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row;
use Flow\Types\Type as FlowType;
use Flow\Types\Type\Logical\{DateTimeType,
DateType,
Expand All @@ -20,7 +21,7 @@
UuidType,
XMLElementType,
XMLType};
use Flow\Types\Type\Native\{BooleanType, FloatType, IntegerType, StringType};
use Flow\Types\Type\Native\{BooleanType, EnumType, FloatType, IntegerType, StringType};

final class TypesMap
{
Expand Down Expand Up @@ -67,6 +68,7 @@ final class TypesMap
XMLElementType::class => \Doctrine\DBAL\Types\StringType::class,
HTMLType::class => \Doctrine\DBAL\Types\StringType::class,
HTMLElementType::class => \Doctrine\DBAL\Types\StringType::class,
EnumType::class => \Doctrine\DBAL\Types\StringType::class,
ListType::class => \Doctrine\DBAL\Types\JsonType::class,
MapType::class => \Doctrine\DBAL\Types\JsonType::class,
StructureType::class => \Doctrine\DBAL\Types\JsonType::class,
Expand Down Expand Up @@ -99,6 +101,29 @@ public function __construct(array $map)
}
}

/**
* Build DBAL types array from a row's entries.
*
* @return array<string, DbalType> Column name => DBAL Type instance
*/
public function flowRowTypes(Row $row) : array
{
$types = [];
$typeClassToName = \array_flip(DbalType::getTypesMap());

foreach ($row->entries() as $entry) {
$dbalTypeClass = $this->toDbalType($entry->type()::class);

if (!\array_key_exists($dbalTypeClass, $typeClassToName)) {
throw new \InvalidArgumentException(\sprintf('DBAL type "%s" is not registered.', $dbalTypeClass));
}

$types[$entry->name()] = DbalType::getType($typeClassToName[$dbalTypeClass]);
}

return $types;
}

/**
* @param class-string<FlowType<mixed>> $flowType
*
Expand Down
Loading
Loading