Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ public function extract(FlowContext $context) : \Generator

while (true) {
$cursor = $this->client->cursor(fetch($cursorName)->forward($this->fetchSize));
$hasRows = false;
$rowCount = $cursor->count();

if ($rowCount === 0) {
$cursor->free();

break;
}

foreach ($cursor->iterate() as $row) {
$hasRows = true;
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);

if ($signal === Signal::STOP) {
Expand All @@ -84,7 +89,7 @@ public function extract(FlowContext $context) : \Generator

$cursor->free();

if (!$hasRows) {
if ($rowCount < $this->fetchSize) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,165 @@
namespace Flow\ETL\Adapter\PostgreSql\Tests\Unit;

use Flow\ETL\Adapter\PostgreSql\PostgreSqlCursorExtractor;
use Flow\ETL\{Config, FlowContext, Schema};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Schema;
use Flow\ETL\Tests\FlowTestCase;
use Flow\PostgreSql\Client\Client;
use Flow\PostgreSql\Client\{Client, Cursor};
use PHPUnit\Framework\MockObject\MockObject;

final class PostgreSqlCursorExtractorTest extends FlowTestCase
{
public function test_cursor_loop_breaks_immediately_when_empty_result() : void
{
$client = $this->createClientMock();
$cursor = $this->createCursorMock(rows: [], count: 0);

$client->expects(self::once())
->method('getTransactionNestingLevel')
->willReturn(1);

$client->expects(self::exactly(2))
->method('execute');

$client->expects(self::once())
->method('cursor')
->willReturn($cursor);

$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
$extractor = $extractor->withFetchSize(10);

$rows = [];

foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
$rows[] = $rowsData;
}

self::assertSame([], $rows);
}

public function test_cursor_loop_breaks_when_rows_less_than_fetch_size() : void
{
$client = $this->createClientMock();

$cursor = $this->createCursorMock(
rows: [
['id' => 1, 'name' => 'User 1'],
['id' => 2, 'name' => 'User 2'],
['id' => 3, 'name' => 'User 3'],
],
count: 3
);

$client->expects(self::once())
->method('getTransactionNestingLevel')
->willReturn(1);

$client->expects(self::exactly(2))
->method('execute');

$client->expects(self::once())
->method('cursor')
->willReturn($cursor);

$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
$extractor = $extractor->withFetchSize(10);

$rows = [];

foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
$rows = [...$rows, ...$rowsData->toArray()];
}

self::assertCount(3, $rows);
}

public function test_cursor_loop_fetches_multiple_batches_when_needed() : void
{
$client = $this->createClientMock();

$cursor1 = $this->createCursorMock(
rows: [
['id' => 1, 'name' => 'User 1'],
['id' => 2, 'name' => 'User 2'],
],
count: 2
);

$cursor2 = $this->createCursorMock(
rows: [
['id' => 3, 'name' => 'User 3'],
],
count: 1
);

$client->expects(self::once())
->method('getTransactionNestingLevel')
->willReturn(1);

$client->expects(self::exactly(2))
->method('execute');

$client->expects(self::exactly(2))
->method('cursor')
->willReturnOnConsecutiveCalls($cursor1, $cursor2);

$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
$extractor = $extractor->withFetchSize(2);

$rows = [];

foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
$rows = [...$rows, ...$rowsData->toArray()];
}

self::assertCount(3, $rows);
}

public function test_cursor_loop_with_exact_fetch_size_multiple_does_extra_fetch() : void
{
$client = $this->createClientMock();

$cursor1 = $this->createCursorMock(
rows: [
['id' => 1, 'name' => 'User 1'],
['id' => 2, 'name' => 'User 2'],
],
count: 2
);

$cursor2 = $this->createCursorMock(
rows: [
['id' => 3, 'name' => 'User 3'],
['id' => 4, 'name' => 'User 4'],
],
count: 2
);

$cursor3 = $this->createCursorMock(rows: [], count: 0);

$client->expects(self::once())
->method('getTransactionNestingLevel')
->willReturn(1);

$client->expects(self::exactly(2))
->method('execute');

$client->expects(self::exactly(3))
->method('cursor')
->willReturnOnConsecutiveCalls($cursor1, $cursor2, $cursor3);

$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
$extractor = $extractor->withFetchSize(2);

$rows = [];

foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
$rows = [...$rows, ...$rowsData->toArray()];
}

self::assertCount(4, $rows);
}

public function test_with_fetch_size_returns_self() : void
{
$client = $this->createClientMock();
Expand Down Expand Up @@ -95,4 +246,35 @@ private function createClientMock() : Client
{
return $this->createMock(Client::class);
}

/**
* @param array<array<string, mixed>> $rows
*
* @return Cursor&MockObject
*/
private function createCursorMock(array $rows, int $count) : Cursor
{
$cursor = $this->createMock(Cursor::class);

$cursor->expects(self::once())
->method('count')
->willReturn($count);

$cursor->method('iterate')
->willReturnCallback(function () use ($rows) : \Generator {
foreach ($rows as $row) {
yield $row;
}
});

$cursor->expects(self::once())
->method('free');

return $cursor;
}

private function createFlowContext() : FlowContext
{
return new FlowContext(Config::default());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,66 @@ public static function forEstimate() : self
);
}

/**
* @param array{
* analyze: bool,
* verbose: bool,
* costs: bool,
* buffers: bool,
* timing: bool,
* summary: bool,
* memory: bool,
* settings: bool,
* wal: bool,
* format: string
* } $data
*/
public static function fromArray(array $data) : self
{
return new self(
analyze: $data['analyze'],
verbose: $data['verbose'],
costs: $data['costs'],
buffers: $data['buffers'],
timing: $data['timing'],
summary: $data['summary'],
memory: $data['memory'],
settings: $data['settings'],
wal: $data['wal'],
format: ExplainFormat::from($data['format']),
);
}

/**
* @return array{
* analyze: bool,
* verbose: bool,
* costs: bool,
* buffers: bool,
* timing: bool,
* summary: bool,
* memory: bool,
* settings: bool,
* wal: bool,
* format: string
* }
*/
public function normalize() : array
{
return [
'analyze' => $this->analyze,
'verbose' => $this->verbose,
'costs' => $this->costs,
'buffers' => $this->buffers,
'timing' => $this->timing,
'summary' => $this->summary,
'memory' => $this->memory,
'settings' => $this->settings,
'wal' => $this->wal,
'format' => $this->format->value,
];
}

public function withAnalyze() : self
{
return new self(
Expand Down
41 changes: 41 additions & 0 deletions src/lib/postgresql/src/Flow/PostgreSql/Explain/Plan/Buffers.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Flow\PostgreSql\Explain\Plan;

/**
* @phpstan-type BuffersShape = array{shared_hit: int, shared_read: int, shared_dirtied: int, shared_written: int, local_hit: int, local_read: int, local_dirtied: int, local_written: int, temp_read: int, temp_written: int}
*/
final readonly class Buffers
{
public function __construct(
Expand All @@ -20,6 +23,25 @@ public function __construct(
) {
}

/**
* @param BuffersShape $data
*/
public static function fromArray(array $data) : self
{
return new self(
sharedHit: $data['shared_hit'],
sharedRead: $data['shared_read'],
sharedDirtied: $data['shared_dirtied'],
sharedWritten: $data['shared_written'],
localHit: $data['local_hit'],
localRead: $data['local_read'],
localDirtied: $data['local_dirtied'],
localWritten: $data['local_written'],
tempRead: $data['temp_read'],
tempWritten: $data['temp_written'],
);
}

public function hasDiskSpill() : bool
{
return $this->tempRead > 0 || $this->tempWritten > 0;
Expand Down Expand Up @@ -52,6 +74,25 @@ public function localWritten() : int
return $this->localWritten;
}

/**
* @return BuffersShape
*/
public function normalize() : array
{
return [
'shared_hit' => $this->sharedHit,
'shared_read' => $this->sharedRead,
'shared_dirtied' => $this->sharedDirtied,
'shared_written' => $this->sharedWritten,
'local_hit' => $this->localHit,
'local_read' => $this->localRead,
'local_dirtied' => $this->localDirtied,
'local_written' => $this->localWritten,
'temp_read' => $this->tempRead,
'temp_written' => $this->tempWritten,
];
}

public function sharedDirtied() : int
{
return $this->sharedDirtied;
Expand Down
22 changes: 22 additions & 0 deletions src/lib/postgresql/src/Flow/PostgreSql/Explain/Plan/Cost.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,33 @@ public function __construct(
) {
}

/**
* @param array{startup_cost: float, total_cost: float} $data
*/
public static function fromArray(array $data) : self
{
return new self(
startupCost: $data['startup_cost'],
totalCost: $data['total_cost'],
);
}

public function incrementalCost() : float
{
return $this->totalCost - $this->startupCost;
}

/**
* @return array{startup_cost: float, total_cost: float}
*/
public function normalize() : array
{
return [
'startup_cost' => $this->startupCost,
'total_cost' => $this->totalCost,
];
}

public function startupCost() : float
{
return $this->startupCost;
Expand Down
Loading
Loading