Skip to content

Commit

Permalink
Added more commands to Flow CLI (#1246)
Browse files Browse the repository at this point in the history
* Added more commands to Flow CLI

* Added a way to pass configuration to flow cli

* Added extra tests for protocol value object
  • Loading branch information
norberttech authored Oct 14, 2024
1 parent 444b79f commit c269cc5
Show file tree
Hide file tree
Showing 47 changed files with 272,088 additions and 303 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
"src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php",
"src/bridge/filesystem/azure/src/Flow/Filesystem/Bridge/Azure/DSL/functions.php",
"src/core/etl/src/Flow/ETL/DSL/functions.php",
"src/cli/src/Flow/CLI/DSL/functions.php",
"src/functions.php",
"src/lib/array-dot/src/Flow/ArrayDot/array_dot.php",
"src/lib/azure-sdk/src/Flow/Azure/SDK/DSL/functions.php",
Expand Down
211 changes: 195 additions & 16 deletions docs/components/cli/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,34 @@
composer require flow-php/cli
```

In some cases it might make sense to install the CLI globally:
In some cases, it might make sense to install the CLI globally:

```
composer global require flow-php/cli
```

Now you can run the CLI using the `flow` command.

## Usage
## Commands

### Config

All Flow CLI Commands can be configured using `--config` option. The option accepts a path to a configuration file in php that returns an Config or ConfigBuilder instance.

`.flow.php`

```php
<?php

use function Flow\ETL\DSL\config_builder;

return config_builder()
->id('execution-id');
```

`flow read --config .flow.php orders.csv`

One of the most common use cases is to mount custom filesystem into Flow fstab to access remote files through CLI.

```shell
$ flow
Expand All @@ -36,12 +55,15 @@ Available commands:
completion Dump the shell completion script
help Display help for a command
list List commands
run Execute ETL pipeline from a php/json file.
file
file:schema Read data schema from a file.
file:read [read] Read data from a file.
file:rows:count [count] Read data schema from a file.
file:schema [schema] Read data schema from a file.
parquet
parquet:read [parquet:read:data] Read data from parquet file
parquet:read:metadata Read metadata from parquet file
pipeline
pipeline:run [run] Execute ETL pipeline from a php/json file.
```

### `file:schema`
Expand All @@ -52,23 +74,36 @@ Description:
Read data schema from a file.

Usage:
file:schema [options] [--] <source>
file:schema [options] [--] <file>
schema

Arguments:
source Path to a file from which schema should be extracted.
file Path to a file from which schema should be extracted.

Options:
--pretty[=PRETTY] Pretty print schema [default: false]
--table[=TABLE] Pretty schema as ascii table [default: false]
--auto-cast[=AUTO-CAST] When set Flow will try to automatically cast values to more precise data types, for example datetime strings will be casted to datetime type [default: false]
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-if, --input-format=INPUT-FORMAT Source file format. When not set file format is guessed from source file path extension
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
--file-format=FILE-FORMAT Source file format. When not set file format is guessed from source file path extension
--file-limit=FILE-LIMIT Limit number of rows that are going to be used to infer file schema, when not set whole file is analyzed
--config=CONFIG. Path to a local php file that MUST return instance of: Flow\ETL\Config
--output-pretty Pretty print schema
--output-table Pretty schema as ascii table
--schema-auto-cast[=SCHEMA-AUTO-CAST] When set Flow will try to automatically cast values to more precise data types, for example datetime strings will be casted to datetime type [default: false]
--json-pointer=JSON-POINTER JSON Pointer to a subtree from which schema should be extracted
--json-pointer-entry-name When set, JSON Pointer will be used as an entry name in the schema
--csv-header[=CSV-HEADER] When set, CSV header will be used as a schema
--csv-empty-to-null[=CSV-EMPTY-TO-NULL] When set, empty CSV values will be treated as NULL values
--csv-separator[=CSV-SEPARATOR] CSV separator character
--csv-enclosure[=CSV-ENCLOSURE] CSV enclosure character
--csv-escape[=CSV-ESCAPE] CSV escape character
--xml-node-path=XML-NODE-PATH XML node path to a subtree from which schema should be extracted, for example /root/element This is not xpath, just a node names separated by slash
--xml-buffer-size=XML-BUFFER-SIZE XML buffer size in bytes
--parquet-columns=PARQUET-COLUMNS Columns to read from parquet file (multiple values allowed)
--parquet-offset=PARQUET-OFFSET Offset to start reading from
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
```

Example:
Expand All @@ -87,4 +122,148 @@ $ flow schema orders.csv --table --auto-cast
| items | json | false | | [] |
+------------+----------+----------+-------------+----------+
7 rows
```

### `file:read`

```shell
$ flow read --help
Description:
Read data from a file.

Usage:
file:read [options] [--] <file>
read

Arguments:
file Path to a file from which schema should be extracted.

Options:
--file-format=FILE-FORMAT File format. When not set file format is guessed from source file path extension
--file-batch-size=FILE-BATCH-SIZE Number of rows that are going to be read and displayed in one batch, when set to -1 whole dataset will be displayed at once [default: 100]
--file-limit=FILE-LIMIT Limit number of rows that are going to be used to infer file schema, when not set whole file is analyzed
--config=CONFIG. Path to a local php file that MUST return instance of: Flow\ETL\Config
--output-truncate=OUTPUT-TRUNCATE Truncate output to given number of characters, when set to -1 output is not truncated at all [default: 20]
--schema-auto-cast[=SCHEMA-AUTO-CAST] When set Flow will try to automatically cast values to more precise data types, for example datetime strings will be casted to datetime type [default: false]
--json-pointer=JSON-POINTER JSON Pointer to a subtree from which schema should be extracted
--json-pointer-entry-name When set, JSON Pointer will be used as an entry name in the schema
--csv-header[=CSV-HEADER] When set, CSV header will be used as a schema
--csv-empty-to-null[=CSV-EMPTY-TO-NULL] When set, empty CSV values will be treated as NULL values
--csv-separator[=CSV-SEPARATOR] CSV separator character
--csv-enclosure[=CSV-ENCLOSURE] CSV enclosure character
--csv-escape[=CSV-ESCAPE] CSV escape character
--xml-node-path=XML-NODE-PATH XML node path to a subtree from which schema should be extracted, for example /root/element This is not xpath, just a node names separated by slash
--xml-buffer-size=XML-BUFFER-SIZE XML buffer size in bytes
--parquet-columns=PARQUET-COLUMNS Columns to read from parquet file (multiple values allowed)
--parquet-offset=PARQUET-OFFSET Offset to start reading from
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
```

### `file:rows:count`

```php
$ flow file:rows:count --help
Description:
Read data schema from a file.

Usage:
file:rows:count [options] [--] <file>
count

Arguments:
file Path to a file from which schema should be extracted.

Options:
--file-format=FILE-FORMAT Source file format. When not set file format is guessed from source file path extension
--file-limit=FILE-LIMIT Limit number of rows that are going to be used to infer file schema, when not set whole file is analyzed
--config=CONFIG. Path to a local php file that MUST return instance of: Flow\ETL\Config
--json-pointer=JSON-POINTER JSON Pointer to a subtree from which schema should be extracted
--json-pointer-entry-name When set, JSON Pointer will be used as an entry name in the schema
--csv-header[=CSV-HEADER] When set, CSV header will be used as a schema
--csv-empty-to-null[=CSV-EMPTY-TO-NULL] When set, empty CSV values will be treated as NULL values
--csv-separator[=CSV-SEPARATOR] CSV separator character
--csv-enclosure[=CSV-ENCLOSURE] CSV enclosure character
--csv-escape[=CSV-ESCAPE] CSV escape character
--xml-node-path=XML-NODE-PATH XML node path to a subtree from which schema should be extracted, for example /root/element This is not xpath, just a node names separated by slash
--xml-buffer-size=XML-BUFFER-SIZE XML buffer size in bytes
--parquet-columns=PARQUET-COLUMNS Columns to read from parquet file (multiple values allowed)
--parquet-offset=PARQUET-OFFSET Offset to start reading from
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
```

### `parquet:read:metadata`

```shell
$ flow parquet:read:metadata --help
Description:
Read metadata from parquet file

Usage:
parquet:read:metadata [options] [--] <file>

Arguments:
file path to parquet file

Options:
--columns Display column details
--row-groups Display row group details
--column-chunks Display column chunks details
--statistics Display column chunks statistics details
--page-headers Display page headers details
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
```

### `pipeline:run`

```shell
$ flow pipeline:run --help
Description:
Execute ETL pipeline from a php/json file.

Usage:
pipeline:run [options] [--] <pipeline-file>
run

Arguments:
pipeline-file Path to a php/json with DataFrame definition.

Options:
--analyze=ANALYZE Collect processing statistics and print them. [default: false]
--config=CONFIG Path to a local php file that MUST return instance of: Flow\ETL\Config
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
pipeline-file argument must point to a valid php file that returns DataFrame instance.
Make sure to not execute run() or any other trigger function.

Example of pipeline.php:
<?php
return df()
->read(from_array([
['id' => 1, 'name' => 'User 01', 'active' => true],
['id' => 2, 'name' => 'User 02', 'active' => false],
['id' => 3, 'name' => 'User 03', 'active' => true],
]))
->collect()
->write(to_output());
```
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public function source() : Path
return $this->path;
}

/**
* @param string $pointer
* @param bool $pointerToEntryName - when true pointer will be used as entry name for extracted data
*/
public function withPointer(string $pointer, bool $pointerToEntryName = false) : self
{
$this->pointer = $pointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtra
private ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN;

/**
* @param array<string> $columns
* @var array<string>
*/
private array $columns = [];

Expand Down Expand Up @@ -48,6 +48,10 @@ public function extract(FlowContext $context) : \Generator
$fileRows = $fileData['file']->metadata()->rowsNumber();
$flowSchema = $this->schemaConverter->fromParquet($fileData['file']->schema());

if (count($this->columns)) {
$flowSchema = $flowSchema->keep(...$this->columns);
}

if ($fileOffset > $fileRows) {
$fileData['stream']->close();
$fileOffset -= $fileRows;
Expand Down
5 changes: 4 additions & 1 deletion src/cli/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
"Flow\\": [
"src/Flow"
]
}
},
"files": [
"src/Flow/CLI/DSL/functions.php"
]
},
"bin": [
"flow"
Expand Down
9 changes: 6 additions & 3 deletions src/cli/flow
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#!/usr/bin/env php
<?php declare(strict_types=1);

use Flow\CLI\Command\ConvertCommand;
use Flow\CLI\Command\RunCommand;
use Flow\CLI\Command\FileReadCommand;
use Flow\CLI\Command\FileRowsCountCommand;
use Flow\CLI\Command\PipelineRunCommand;
use Flow\CLI\Command\FileSchemaCommand;
use Flow\CLI\FlowVersion;
use Flow\ParquetViewer\Command\ReadDataCommand;
Expand Down Expand Up @@ -43,7 +44,9 @@ $application = new Application('Flow PHP - Data processing framework', $pharRunt

$application->add((new ReadDataCommand())->setName('parquet:read')->setAliases(['parquet:read:data']));
$application->add((new ReadMetadataCommand())->setName('parquet:read:metadata'));
$application->add((new RunCommand())->setName('run'));
$application->add((new PipelineRunCommand())->setName('pipeline:run')->setAliases(['run']));
$application->add((new FileReadCommand())->setName('file:read')->setAliases(['read']));
$application->add((new FileSchemaCommand())->setName('file:schema')->setAliases(['schema']));
$application->add((new FileRowsCountCommand())->setName('file:rows:count')->setAliases(['count']));

$application->run();
29 changes: 29 additions & 0 deletions src/cli/src/Flow/CLI/Arguments/FilePathArgument.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Flow\CLI\Arguments;

use function Flow\CLI\argument_string;
use function Flow\Filesystem\DSL\path_real;
use Flow\ETL\Config;
use Flow\Filesystem\Path;
use Symfony\Component\Console\Input\InputInterface;

final class FilePathArgument
{
public function __construct(private readonly string $path)
{
}

public function getExisting(InputInterface $input, Config $config) : Path
{
$path = path_real(argument_string($this->path, $input));

if ($config->fstab()->for($path)->status($path) === null) {
throw new \InvalidArgumentException("File '{$path->path()}' does not exist.");
}

return $path;
}
}
Loading

0 comments on commit c269cc5

Please sign in to comment.