Skip to content

Commit

Permalink
Added offset option to parquet extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Jan 29, 2024
1 parent 13c319f commit 3259835
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\FileExtractor;
use Flow\ETL\Extractor\Limitable;
Expand Down Expand Up @@ -32,17 +33,22 @@ public function __construct(
private readonly Path $path,
private readonly Options $options,
private readonly ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN,
private readonly array $columns = []
private readonly array $columns = [],
private readonly ?int $offset = null
) {
$this->resetLimit();

if ($this->path->isPattern() && $this->offset !== null) {
throw new InvalidArgumentException('Offset can be used only with single file path, not with pattern');
}
}

public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->readers($context) as $fileData) {
foreach ($fileData['file']->values($this->columns, $this->limit()) as $row) {
foreach ($fileData['file']->values($this->columns, $this->limit(), $this->offset) as $row) {
if ($shouldPutInputIntoRows) {
$row['_input_file_uri'] = $fileData['uri'];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
use Flow\ETL\Flow;
use Flow\ETL\FlowContext;
use Flow\Parquet\Options;
use Flow\Parquet\Reader;
use PHPUnit\Framework\TestCase;

final class ParquetExtractorTest extends TestCase
{
public function test_limit() : void
{
$path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.csv';
$path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.parquet';

if (\file_exists($path)) {
\unlink($path);
Expand All @@ -36,9 +37,25 @@ public function test_limit() : void
);
}

public function test_reading_file_from_given_offset() : void
{
$totalRows = (new Reader())->read(__DIR__ . '/../Fixtures/orders_flow.parquet')->metadata()->rowsNumber();

$extractor = new ParquetExtractor(
Path::realpath(__DIR__ . '/../Fixtures/orders_flow.parquet'),
Options::default(),
offset: $totalRows - 100
);

$this->assertCount(
100,
\iterator_to_array($extractor->extract(new FlowContext(Config::default())))
);
}

public function test_signal_stop() : void
{
$path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.csv';
$path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.parquet';

if (\file_exists($path)) {
\unlink($path);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Parquet\Tests\Unit;

use Flow\ETL\Adapter\Parquet\ParquetExtractor;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Filesystem\Path;
use Flow\Parquet\Options;
use PHPUnit\Framework\TestCase;

final class ParquetExtractorTest extends TestCase
{
public function test_using_offset_with_pattern_path() : void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Offset can be used only with single file path, not with pattern');

new ParquetExtractor(
new Path('/tmp/*.parquet'),
Options::default(),
offset: 100
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Adapter\Parquet\Tests\Unit\ParquetFile\RowGroupBuilder;
namespace Flow\Parquet\Tests\Integration\ParquetFile\RowGroupBuilder;

use Flow\Parquet\ParquetFile\RowGroupBuilder\ColumnChunkStatistics;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
Expand Down

0 comments on commit 3259835

Please sign in to comment.