Skip to content

Commit

Permalink
Added StdOut Filesystem with stdout protocol (#1233)
Browse files Browse the repository at this point in the history
Added StdOut Filesystem with stdout protocl
  • Loading branch information
norberttech authored Sep 26, 2024
1 parent 7f5b831 commit 6999a0e
Show file tree
Hide file tree
Showing 33 changed files with 433 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function closure(FlowContext $context) : void

$output->append($content);

$context->streams()->closeWriters($this->output);
$context->streams()->closeStreams($this->output);
}

if ($this->outputVar !== null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);

return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function __construct(

public function closure(FlowContext $context) : void
{
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
}

public function destination() : Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);

return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ public function __construct(private readonly Path $path)

public function closure(FlowContext $context) : void
{
foreach ($context->streams() as $stream) {
if ($stream->path()->extension() === 'json') {
$stream->append($this->putRowsInNewLines ? "\n]" : ']');
}
foreach ($context->streams()->listOpenStreams($this->path) as $stream) {
$stream->append($this->putRowsInNewLines ? "\n]" : ']');
}

$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
}

public function destination() : Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\ETL\Adapter\JSON\JSONMachine\JsonExtractor;
use Flow\ETL\Row\Schema;
use Flow\ETL\{Attribute\DocumentationDSL, Attribute\DocumentationExample, Attribute\Module, Attribute\Type, Loader};
use Flow\ETL\{Attribute\DocumentationDSL, Attribute\DocumentationExample, Attribute\Module, Attribute\Type};
use Flow\Filesystem\Path;

/**
Expand Down Expand Up @@ -40,15 +40,15 @@ function from_json(
* @param string $date_time_format - format for DateTimeInterface::format() - @deprecate use withDateTimeFormat method instead
* @param bool $put_rows_in_new_lines - if you want to put each row in a new line - @deprecate use withRowsInNewLines method instead
*
* @return Loader
* @return JsonLoader
*/
#[DocumentationDSL(module: Module::JSON, type: Type::LOADER)]
function to_json(
string|Path $path,
int $flags = JSON_THROW_ON_ERROR,
string $date_time_format = \DateTimeInterface::ATOM,
bool $put_rows_in_new_lines = false,
) : Loader {
) : JsonLoader {
return (new JsonLoader(\is_string($path) ? Path::realpath($path) : $path))
->withFlags($flags)
->withDateTimeFormat($date_time_format)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ final class JsonTest extends TestCase
{
public function test_json_loader() : void
{

df()
->read(new FakeExtractor(100))
->write(to_json($path = __DIR__ . '/var/test_json_loader.json'))
Expand Down Expand Up @@ -81,6 +80,38 @@ public function test_json_loader_overwrite_mode() : void
}
}

public function test_partitioning_json_file() : void
{
df()
->read(from_array($dataset = [
['id' => 1, 'color' => 'red', 'size' => 'small'],
['id' => 2, 'color' => 'blue', 'size' => 'medium'],
['id' => 3, 'color' => 'green', 'size' => 'large'],
['id' => 4, 'color' => 'yellow', 'size' => 'small'],
['id' => 5, 'color' => 'black', 'size' => 'medium'],
['id' => 6, 'color' => 'white', 'size' => 'large'],
['id' => 7, 'color' => 'red', 'size' => 'small'],
['id' => 8, 'color' => 'blue', 'size' => 'medium'],
['id' => 9, 'color' => 'green', 'size' => 'large'],
['id' => 10, 'color' => 'yellow', 'size' => 'small'],
['id' => 11, 'color' => 'black', 'size' => 'medium'],
['id' => 12, 'color' => 'white', 'size' => 'large'],
]))
->saveMode(overwrite())
->partitionBy('size', 'color')
->write(to_json($path = __DIR__ . '/var/test_partitioning_json_file/products.json'))
->run();

self::assertEquals(
$dataset,
df()
->read(from_json(__DIR__ . '/var/test_partitioning_json_file/**/*.json'))
->sortBy(ref('id')->asc())
->fetch()
->toArray()
);
}

public function test_putting_each_row_in_a_new_line() : void
{
df()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);

return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function closure(FlowContext $context) : void
}
}

$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
$this->writers = [];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);

return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __construct(private readonly Path $path)

public function closure(FlowContext $context) : void
{
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
}

public function destination() : Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ public function __construct(

public function closure(FlowContext $context) : void
{
foreach ($context->streams() as $stream) {
if ($stream->path()->extension() === 'xml') {
$stream->append('</' . $this->rootElementName . '>');
}
foreach ($context->streams()->listOpenStreams($this->path) as $stream) {
$stream->append('</' . $this->rootElementName . '>');
}

$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
}

public function destination() : Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
$this->freeParser();

return;
Expand Down Expand Up @@ -138,7 +138,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);
$this->freeParser();

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public function extract(FlowContext $context) : \Generator

if ($signal === Signal::STOP || $this->reachedLimit()) {
$xmlReader->close();
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);

return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,49 @@
namespace Flow\ETL\Adapter\XML\Tests\Integration\Loader;

use function Flow\ETL\Adapter\XML\{from_xml, to_xml};
use function Flow\ETL\DSL\{df, from_array, overwrite};
use function Flow\ETL\DSL\{df, from_array, overwrite, ref};
use Flow\ETL\Tests\Double\FakeExtractor;
use Flow\ETL\Tests\Integration\IntegrationTestCase;

final class XMLLoaderTest extends IntegrationTestCase
{
public function test_partitioning_xml_file() : void
{
df()
->read(from_array($dataset = [
['id' => 1, 'color' => 'red', 'size' => 'small'],
['id' => 2, 'color' => 'blue', 'size' => 'medium'],
['id' => 3, 'color' => 'green', 'size' => 'large'],
['id' => 4, 'color' => 'yellow', 'size' => 'small'],
['id' => 5, 'color' => 'black', 'size' => 'medium'],
['id' => 6, 'color' => 'white', 'size' => 'large'],
['id' => 7, 'color' => 'red', 'size' => 'small'],
['id' => 8, 'color' => 'blue', 'size' => 'medium'],
['id' => 9, 'color' => 'green', 'size' => 'large'],
['id' => 10, 'color' => 'yellow', 'size' => 'small'],
['id' => 11, 'color' => 'black', 'size' => 'medium'],
['id' => 12, 'color' => 'white', 'size' => 'large'],
]))
->saveMode(overwrite())
->batchSize(1)
->partitionBy('size', 'color')
->write(to_xml($path = __DIR__ . '/var/test_partitioning_xml_file/products.xml'))
->run();

self::assertEquals(
$dataset,
df()
->read(from_xml(__DIR__ . '/var/test_partitioning_xml_file/**/*.xml')->withXMLNodePath('rows/row'))
->withEntry('id', ref('node')->xpath('id')->domElementValue()->cast('int'))
->withEntry('color', ref('node')->xpath('color')->domElementValue())
->withEntry('size', ref('node')->xpath('size')->domElementValue())
->drop('node')
->sortBy(ref('id')->asc())
->fetch()
->toArray()
);
}

public function test_writing_empty_rows() : void
{
df()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public function extract(FlowContext $context) : \Generator
$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$context->streams()->closeStreams($this->path);

return;
}
Expand Down
25 changes: 24 additions & 1 deletion src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function __construct(private readonly FilesystemTable $fstab)
$this->saveMode = SaveMode::ExceptionIfExists;
}

public function closeWriters(Path $path) : void
public function closeStreams(Path $path) : void
{
$streams = [];

Expand Down Expand Up @@ -125,6 +125,21 @@ public function list(Path $path, Filter $pathFilter) : \Generator
}
}

public function listOpenStreams(Path $path) : \Generator
{
$uri = $path->uri();

if (!\array_key_exists($uri, $this->writingStreams)) {
return [];
}

foreach ($this->writingStreams[$uri] as $stream) {
if ($stream->isOpen()) {
yield $stream;
}
}
}

public function read(Path $path, array $partitions = []) : SourceStream
{
if ($path->isPattern()) {
Expand Down Expand Up @@ -188,6 +203,14 @@ public function writeTo(Path $path, array $partitions = []) : DestinationStream
$destinationPathUri = $destination->uri();

if (!\array_key_exists($destinationPathUri, $this->writingStreams[$pathUri])) {
if ($path->protocol()->is('stdout') && \count($this->writingStreams) > 0) {
foreach ($this->getIterator() as $writingStream) {
if ($writingStream->path()->protocol()->is('stdout')) {
throw new RuntimeException('Only one stdout filesystem stream can be open at the same time');
}
}
}

$fs = $this->fstab->for($path);

$outputPath = $destination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Tests\Integration\Filesystem\FilesystemStreams;

use function Flow\Filesystem\DSL\path_stdout;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\Filesystem\Partition;
use Flow\Filesystem\Path\Filter\KeepAll;
Expand All @@ -26,10 +27,20 @@ public function test_is_open_for_writing() : void
$streams->writeTo($this->getPath(__FUNCTION__ . '/file.txt'));
self::assertTrue($streams->isOpen($this->getPath(__FUNCTION__ . '/file.txt')));
self::assertCount(1, $streams);
$streams->closeWriters($this->getPath(__FUNCTION__ . '/file.txt'));
$streams->closeStreams($this->getPath(__FUNCTION__ . '/file.txt'));
self::assertFalse($streams->isOpen($this->getPath(__FUNCTION__ . '/file.txt')));
}

public function test_open_two_write_streams_to_stdout() : void
{
$this->expectExceptionMessage('Only one stdout filesystem stream can be open at the same time');

$streams = $this->streams();
$streams->writeTo(path_stdout('json'));
$streams->writeTo(path_stdout('json'));

}

public function test_read() : void
{
$this->setupFiles([
Expand Down Expand Up @@ -115,6 +126,14 @@ public function test_scan() : void
);
}

public function test_write_to_stdout() : void
{
$streams = $this->streams();
$streams->writeTo(path_stdout('json'));

self::assertCount(1, $streams);
}

protected function streams() : FilesystemStreams
{
return new FilesystemStreams($this->fstab());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function test_open_stream_for_existing_file() : void

$appendFileStream = $streams->writeTo($file);
$appendFileStream->append('new content');
$streams->closeWriters($file);
$streams->closeStreams($file);

$files = \iterator_to_array($this->fs()->list(new Path($file->parentDirectory()->path() . '/*')));

Expand All @@ -53,7 +53,7 @@ public function test_open_stream_for_non_existing_file() : void

$appendFileStream = $streams->writeTo($file);
$appendFileStream->append('new content');
$streams->closeWriters($file);
$streams->closeStreams($file);

$files = \iterator_to_array($this->fs()->list(new Path($file->parentDirectory()->path() . '/*')));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function test_open_stream_for_non_existing_file() : void

$fileStream = $streams->writeTo($file);
$fileStream->append('some content');
$streams->closeWriters($file);
$streams->closeStreams($file);

self::assertFileExists($file->path());
self::assertSame('some content', \file_get_contents($file->path()));
Expand Down
Loading

0 comments on commit 6999a0e

Please sign in to comment.