Skip to content

Commit

Permalink
Add XML writer
Browse files Browse the repository at this point in the history
  • Loading branch information
stloyd committed Jan 9, 2024
1 parent 5c4c911 commit fb235d7
Show file tree
Hide file tree
Showing 17 changed files with 1,049 additions and 16 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"webmozart/glob": "^3.0 || ^4.0"
},
"require-dev": {
"ext-xmlwriter": "*",
"aeon-php/calendar": "^1.0",
"fakerphp/faker": "^1.23",
"fig/log-test": "^1.1",
Expand Down
6 changes: 4 additions & 2 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public function load(Rows $rows, FlowContext $context) : void
Row\Entry\DateTimeEntry::class => (int) $entry->value()->format('Uu'),
Row\Entry\UuidEntry::class => $entry->value()->toString(),
Row\Entry\EnumEntry::class => $entry->value()->name,
Row\Entry\XMLEntry::class => $entry->toString(),
default => $entry->value(),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Flow\ETL\Adapter\Parquet;

use Flow\ETL\Row\Entry\UuidEntry;
use Flow\ETL\Row\Entry\XMLEntry;
use Flow\ETL\Rows;

final class RowsNormalizer
Expand All @@ -20,6 +21,7 @@ public function normalize(Rows $rows) : array
foreach ($row->entries() as $entry) {
$columns[$entry->name()] = match ($entry::class) {
UuidEntry::class => $entry->value()->toString(),
XMLEntry::class => $entry->toString(),
default => $entry->value(),
};
}
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/etl-adapter-xml/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
"ext-xmlreader": "*",
"flow-php/etl": "^0.5.0 || 1.x-dev"
},
"require-dev": {
"ext-xmlwriter": "*"
},
"config": {
"optimize-autoloader": true,
"sort-packages": true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\XML\Loader;

use Flow\ETL\Adapter\XML\RowsNormalizer;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\FileStream;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
use Flow\ETL\Rows;

final class DomDocumentLoader implements Closure, Loader, Loader\FileLoader
{
public function __construct(
private readonly Path $path,
private readonly RowsNormalizer $normalizer = new RowsNormalizer(),
private readonly string $collectionName = 'rows',
private readonly string $collectionElementName = 'row',
) {
if ($this->path->isPattern()) {
throw new \InvalidArgumentException("XMLLoader path can't be pattern, given: " . $this->path->path());
}
}

public function closure(FlowContext $context) : void
{
foreach ($context->streams() as $stream) {
if ($stream->path()->extension() === 'xml') {
\fwrite($stream->resource(), "</{$this->collectionName}>");
}
}

$context->streams()->close($this->path);
}

public function destination() : Path
{
return $this->path;
}

public function load(Rows $rows, FlowContext $context) : void
{
$streams = $context->streams();

if (!$streams->isOpen($this->path, $rows->partitions()->toArray())) {
$stream = $streams->open($this->path, 'xml', $context->appendSafe(), $rows->partitions()->toArray());

\fwrite($stream->resource(), (new \DOMDocument('1.0', 'utf-8'))->saveXML() . "<{$this->collectionName}>");
} else {
$stream = $streams->open($this->path, 'xml', $context->appendSafe(), $rows->partitions()->toArray());
}

$this->writeXML($rows, $stream);
}

/**
* @throws RuntimeException
* @throws \DOMException
*/
private function writeXML(Rows $rows, FileStream $stream) : void
{
foreach ($this->normalizer->normalize($rows) as $row) {
$dom = new \DOMDocument('1.0', 'utf-8');

$rowElement = $dom->createElement($this->collectionElementName);

foreach ($row as $name => $value) {
$rowItem = $dom->createElement($name);
$rowItem->appendChild($dom->createTextNode($value));

$rowElement->appendChild($rowItem);
}

$dom->appendChild($rowElement);

\fwrite($stream->resource(), $dom->saveXML($dom->documentElement) ?: '');
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\XML\Loader;

use Flow\ETL\Adapter\XML\RowsNormalizer;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
use Flow\ETL\Partition;
use Flow\ETL\Rows;

final class XMLWriterLoader implements Closure, Loader, Loader\FileLoader
{
/**
* @var array<string, \XMLWriter>
*/
private array $writers = [];

public function __construct(
private readonly Path $path,
private readonly RowsNormalizer $normalizer = new RowsNormalizer(),
private readonly string $collectionName = 'rows',
private readonly string $collectionElementName = 'row',
) {
if ($this->path->isPattern()) {
throw new \InvalidArgumentException("XMLLoader path can't be pattern, given: " . $this->path->path());
}
}

public function closure(FlowContext $context) : void
{
foreach ($context->streams() as $stream) {
if ($stream->path()->extension() === 'xml') {
$this->writers[$stream->path()->path()]->endDocument();
$this->writers[$stream->path()->path()]->flush();
}
}

$context->streams()->close($this->path);
}

public function destination() : Path
{
return $this->path;
}

public function load(Rows $rows, FlowContext $context) : void
{
$this->write($rows, $rows->partitions()->toArray(), $context);
}

/**
* @param array<Partition> $partitions
*/
private function write(Rows $rows, array $partitions, FlowContext $context) : void
{
$streams = $context->streams();

$stream = $streams->open($this->path, 'xml', $context->appendSafe(), $partitions);

if (!\array_key_exists($stream->path()->path(), $this->writers)) {
$writer = new \XMLWriter();
$writer->openUri($stream->path()->path());
$writer->startDocument('1.0', 'UTF-8');
$writer->startElement($this->collectionName);

$this->writers[$stream->path()->path()] = $writer;
} else {
$writer = $this->writers[$stream->path()->path()];
}

foreach ($this->normalizer->normalize($rows) as $row) {
$writer->startElement($this->collectionElementName);

foreach ($row as $name => $value) {
$writer->writeElement($name, $value);
}

$writer->endElement();
}

$writer->flush();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Adapter\XML;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Row\Entry\ArrayEntry;
use Flow\ETL\Row\Entry\ListEntry;
use Flow\ETL\Row\Entry\MapEntry;
use Flow\ETL\Row\Entry\ObjectEntry;
use Flow\ETL\Row\Entry\StructureEntry;
use Flow\ETL\Rows;

final class RowsNormalizer
{
/**
* @return \Generator<mixed, array<string, string>>
*/
public function normalize(Rows $rows) : \Generator
{
foreach ($rows as $row) {
$columns = [];

/** @psalm-suppress InvalidCast */
foreach ($row->entries() as $entry) {
$columns[$entry->name()] = match ($entry::class) {
ArrayEntry::class,
ListEntry::class,
MapEntry::class,
StructureEntry::class => throw new RuntimeException('Entry of type ' . $entry::class . ' cannot be normalized to XML values.'),
ObjectEntry::class => match ($entry->value() instanceof \Stringable) {
false => throw new RuntimeException('Entry of type ' . \get_class($entry->value()) . ' cannot be normalized to XML values.'),
/** @phpstan-ignore-next-line */
true => (string) $entry->value(),
},
default => $entry->toString(),
};
}

yield $columns;
}
}
}
15 changes: 12 additions & 3 deletions src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
namespace Flow\ETL\Adapter\XML;

use function Flow\ETL\DSL\from_all;
use Flow\ETL\Adapter\XML\Loader\DomDocumentLoader;
use Flow\ETL\Adapter\XML\Loader\XMLWriterLoader;
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;

/**
* @param array<Path|string>|Path|string $path
* @param string $xml_node_path
*
* @return Extractor
*/
function from_xml(
string|Path|array $path,
Expand All @@ -37,3 +37,12 @@ function from_xml(
$xml_node_path
);
}

function to_xml(string|Path $path) : Loader
{
if (\class_exists(\XMLWriter::class)) {
return new XMLWriterLoader(\is_string($path) ? Path::realpath($path) : $path);
}

return new DomDocumentLoader(\is_string($path) ? Path::realpath($path) : $path);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php declare(strict_types=1);

namespace Flow\ETL\Adapter\XML\Tests\Benchmark\Loader;

use function Flow\ETL\Adapter\Xml\from_xml;
use Flow\ETL\Adapter\XML\Loader\DomDocumentLoader;
use Flow\ETL\Config;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\FlowContext;
use Flow\ETL\Rows;
use PhpBench\Attributes\Groups;

#[Groups(['loader'])]
final class DomDocumentLoaderBench
{
private readonly FlowContext $context;

private readonly string $outputPath;

private Rows $rows;

public function __construct()
{
$this->context = new FlowContext(Config::default());
$this->outputPath = \tempnam(\sys_get_temp_dir(), 'etl_xml_loader_bench') . '.xml';
$this->rows = new Rows();

foreach (from_xml(__DIR__ . '/../Fixtures/flow_orders.xml')->extract($this->context) as $rows) {
$this->rows = $this->rows->merge($rows);
}
}

public function __destruct()
{
if (!\file_exists($this->outputPath)) {
throw new \RuntimeException("Benchmark failed, \"{$this->outputPath}\" doesn't exist");
}

\unlink($this->outputPath);
}

public function bench_load_10k() : void
{
$loader = new DomDocumentLoader(Path::realpath($this->outputPath));
$loader->load($this->rows, $this->context);
}
}
Loading

0 comments on commit fb235d7

Please sign in to comment.