diff --git a/build/runtime.php b/build/runtime.php index f61cf5a8b..e78f4cc05 100644 --- a/build/runtime.php +++ b/build/runtime.php @@ -22,6 +22,8 @@ exit(1); } +\ini_set('memory_limit', -1); + (new SingleCommandApplication()) ->setName('Flow-PHP - Extract Transform Load - Data processing framework') ->setVersion(FlowVersion::getVersion()) diff --git a/examples/bootstrap.php b/examples/bootstrap.php index 872e8150f..6077883ff 100644 --- a/examples/bootstrap.php +++ b/examples/bootstrap.php @@ -1,5 +1,11 @@ read(CSV::from(__FLOW_DATA__ . '/power-plant-daily.csv', 10, delimiter: ';')) ->withEntry('unpacked', ref('row')->unpack()) ->renameAll('unpacked.', '') @@ -45,3 +45,9 @@ ->withEntry('consumption', ref('consumption')->multiply(lit(100))->round(lit(2))) ->withEntry('consumption', concat(ref('consumption'), lit('%'))) ->write(To::output(truncate: false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/aggregations/power_plant_bar_chart.php b/examples/topics/aggregations/power_plant_bar_chart.php index cfe95a4ae..08e0f52ed 100644 --- a/examples/topics/aggregations/power_plant_bar_chart.php +++ b/examples/topics/aggregations/power_plant_bar_chart.php @@ -13,7 +13,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow) +$flow = (new Flow) ->read(CSV::from(__FLOW_DATA__ . '/power-plant-daily.csv', 10, delimiter: ';')) ->withEntry('unpacked', ref('row')->unpack()) ->renameAll('unpacked.', '') @@ -50,3 +50,9 @@ output: __FLOW_OUTPUT__ . '/power_plant_bar_chart.html' ) ); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/http/psr_http_dynamic.php b/examples/topics/http/psr_http_dynamic.php index ba36542c6..9946d9199 100644 --- a/examples/topics/http/psr_http_dynamic.php +++ b/examples/topics/http/psr_http_dynamic.php @@ -30,7 +30,7 @@ public function create(?Message\ResponseInterface $previousResponse = null) : ?M } }); -return (new Flow()) +$flow = (new Flow()) ->read($extractor) ->withEntry('unpacked', ref('response_body')->jsonDecode()) ->select('unpacked') @@ -38,3 +38,9 @@ public function create(?Message\ResponseInterface $previousResponse = null) : ?M ->renameAll('unpacked.', '') ->select('name', 'html_url', 'blog') ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/join/left_anti/left_anti_join.php b/examples/topics/join/left_anti/left_anti_join.php index 3ef332392..85454db70 100644 --- a/examples/topics/join/left_anti/left_anti_join.php +++ b/examples/topics/join/left_anti/left_anti_join.php @@ -30,7 +30,7 @@ * then it might become performance bottleneck. * In that case please look at DataFrame::joinEach. */ -return (new Flow()) +$flow = (new Flow()) ->process($externalProducts) ->join( (new Flow())->process($internalProducts), @@ -39,6 +39,12 @@ ) ->write(To::output()); +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); + // Output // // +--+---------+ diff --git a/examples/topics/join/left_anti/left_anti_join_each.php b/examples/topics/join/left_anti/left_anti_join_each.php index 6b9723b87..3425aeba0 100644 --- a/examples/topics/join/left_anti/left_anti_join_each.php +++ b/examples/topics/join/left_anti/left_anti_join_each.php @@ -69,7 +69,7 @@ private function findRowsInDatabase(Rows $rows) : Rows * right size is much bigger then a left side. In that case it's better to reduce the ride side * by fetching from the storage only what is relevant for the left side. */ -return (new Flow()) +$flow = (new Flow()) ->extract($apiExtractor) ->joinEach( $dbDataFrameFactory, @@ -78,6 +78,12 @@ private function findRowsInDatabase(Rows $rows) : Rows ) ->write(To::output()); +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); + // Output: // // +-----+-------------+ diff --git a/examples/topics/transformations/aggregate.php b/examples/topics/transformations/aggregate.php index c8fdd7979..dea2b4b10 100644 --- a/examples/topics/transformations/aggregate.php +++ b/examples/topics/transformations/aggregate.php @@ -13,7 +13,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('a', 100)), @@ -25,3 +25,9 @@ ) ->aggregate(Aggregation::sum(ref('a'))) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/array_expand.php b/examples/topics/transformations/array_expand.php index 999aaf0f0..c3bf69e19 100644 --- a/examples/topics/transformations/array_expand.php +++ b/examples/topics/transformations/array_expand.php @@ -13,7 +13,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])), @@ -22,3 +22,9 @@ ->write(To::output(false)) ->withEntry('expanded', array_expand(ref('array'))) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/array_unpack.php b/examples/topics/transformations/array_unpack.php index d976b124f..b753f08ce 100644 --- a/examples/topics/transformations/array_unpack.php +++ b/examples/topics/transformations/array_unpack.php @@ -12,7 +12,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])), @@ -22,3 +22,9 @@ ->write(To::output(false)) ->withEntry('unpacked', ref('array')->unpack()) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/filter_divide.php b/examples/topics/transformations/filter_divide.php index 3b6d8eaf2..32f002e61 100644 --- a/examples/topics/transformations/filter_divide.php +++ b/examples/topics/transformations/filter_divide.php @@ -13,7 +13,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('a', 100), Entry::int('b', 100)), @@ -23,3 +23,9 @@ ->filter(ref('b')->divide(lit(2))->equals(lit('a'))) ->withEntry('new_b', ref('b')->multiply(lit(2))->multiply(lit(5))) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/filter_mod.php b/examples/topics/transformations/filter_mod.php index 9dd70dfba..86e4d2d7b 100644 --- a/examples/topics/transformations/filter_mod.php +++ b/examples/topics/transformations/filter_mod.php @@ -13,7 +13,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('a', 4), Entry::int('b', 5)), @@ -22,3 +22,9 @@ ) ->filter(ref('b')->mod(lit(2))->equals(lit(0))) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/group_by.php b/examples/topics/transformations/group_by.php index 1c5afa278..f5b768787 100644 --- a/examples/topics/transformations/group_by.php +++ b/examples/topics/transformations/group_by.php @@ -12,7 +12,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('a', 100)), @@ -24,3 +24,9 @@ ) ->groupBy(ref('a')) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/literals.php b/examples/topics/transformations/literals.php index b7d43b0c5..abe2b5b50 100644 --- a/examples/topics/transformations/literals.php +++ b/examples/topics/transformations/literals.php @@ -12,7 +12,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::create(Entry::string('name', 'Norbert')) @@ -20,3 +20,9 @@ ) ->withEntry('number', lit(1)) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/math.php b/examples/topics/transformations/math.php index 823c4be30..7699c81ac 100644 --- a/examples/topics/transformations/math.php +++ b/examples/topics/transformations/math.php @@ -12,7 +12,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::create(Entry::integer('a', 100), Entry::integer('b', 200)) @@ -22,3 +22,9 @@ ->withEntry('c', ref('a')->plus(ref('b'))) ->withEntry('d', ref('b')->minus(ref('a'))) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/size.php b/examples/topics/transformations/size.php index 36b4ba508..ff8155851 100644 --- a/examples/topics/transformations/size.php +++ b/examples/topics/transformations/size.php @@ -12,7 +12,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read( From::rows(new Rows( Row::with(Entry::int('id', 1), Entry::array('array', ['a' => 1, 'b' => 2, 'c' => 3])), @@ -20,3 +20,9 @@ ) ->withEntry('array_size', ref('array')->size()) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/sort.php b/examples/topics/transformations/sort.php index 05cf8d5fa..867b9631e 100644 --- a/examples/topics/transformations/sort.php +++ b/examples/topics/transformations/sort.php @@ -9,7 +9,13 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read(From::sequence_number('id', 0, 10)) ->sortBy(ref('id')->desc()) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/when_null.php b/examples/topics/transformations/when_null.php index bc3c78932..4d53d79d0 100644 --- a/examples/topics/transformations/when_null.php +++ b/examples/topics/transformations/when_null.php @@ -14,7 +14,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read(From::rows(new Rows( Row::with(Entry::int('id', 1), Entry::int('value', 1)), Row::with(Entry::int('id', 2), Entry::int('value', 1)), @@ -27,3 +27,9 @@ when(ref('value')->isNull(), then: lit(0)) ) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/transformations/when_odd.php b/examples/topics/transformations/when_odd.php index 2e018f392..9b5b3a88d 100644 --- a/examples/topics/transformations/when_odd.php +++ b/examples/topics/transformations/when_odd.php @@ -11,7 +11,7 @@ require __DIR__ . '/../../bootstrap.php'; -return (new Flow()) +$flow = (new Flow()) ->read(From::sequence_number('number', 1, 100)) ->collect() ->withEntry( @@ -23,3 +23,9 @@ ) ) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/types/csv/csv_read.php b/examples/topics/types/csv/csv_read.php index 52b8b8e78..8124758bc 100644 --- a/examples/topics/types/csv/csv_read.php +++ b/examples/topics/types/csv/csv_read.php @@ -10,20 +10,24 @@ require __DIR__ . '/../../../bootstrap.php'; +$flow = (new Flow()) + ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 1000)) + ->withEntry('unpacked', ref('row')->unpack()) + ->renameAll('unpacked.', '') + ->drop(col('row')) + ->limit(10_000); + +if ('' !== \Phar::running(false)) { + return $flow; +} + $csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024); print "Reading CSV {$csvFileSize}Mb file...\n"; $stopwatch = new Stopwatch(); $stopwatch->start(); -$total = 0; -(new Flow()) - ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 1000)) - ->withEntry('unpacked', ref('row')->unpack()) - ->renameAll('unpacked.', '') - ->drop(col('row')) - ->limit(10_000) - ->run(); +$flow->run(); $stopwatch->stop(); diff --git a/examples/topics/types/csv/csv_read_partitioned.php b/examples/topics/types/csv/csv_read_partitioned.php index 22fd8be93..a51a4e006 100644 --- a/examples/topics/types/csv/csv_read_partitioned.php +++ b/examples/topics/types/csv/csv_read_partitioned.php @@ -6,47 +6,31 @@ use function Flow\ETL\DSL\ref; use Aeon\Calendar\Stopwatch; use Flow\ETL\DSL\CSV; -use Flow\ETL\DSL\Partitions; use Flow\ETL\DSL\To; use Flow\ETL\Flow; require __DIR__ . '/../../../bootstrap.php'; -print "Reading partitioned CSV dataset...\n"; - -$stopwatch = new Stopwatch(); -$stopwatch->start(); -$total = 0; - -(new Flow()) +$flow = (new Flow()) ->read(CSV::from(__FLOW_DATA__ . '/partitioned')) ->withEntry('unpacked', ref('row')->unpack()) ->renameAll('unpacked.', '') ->drop(col('row')) ->collect() ->sortBy(ref('id')) - ->write(To::output()) - ->run(); - -$stopwatch->lap(); + ->write(To::output()); -print "Total elapsed time: {$stopwatch->elapsedTime(1)->inSecondsPrecise()}s\n\n"; +if ('' !== \Phar::running(false)) { + return $flow; +} -print "Reading partitioned CSV dataset with partition filtering...\n"; - -(new Flow()) - ->read(CSV::from(__FLOW_DATA__ . '/partitioned')) - ->withEntry('unpacked', ref('row')->unpack()) - ->renameAll('unpacked.', '') - ->drop('row') - ->collect() - ->filterPartitions(Partitions::only('t_shirt_color', 'green')) - ->sortBy(ref('id')) - ->write(To::output()) - ->run(); +print "Reading partitioned CSV dataset...\n"; -$stopwatch->lap(); +$stopwatch = new Stopwatch(); +$stopwatch->start(); -print "Total elapsed time: {$stopwatch->elapsedTime(2)->inSecondsPrecise()}s\n"; +$flow->run(); $stopwatch->stop(); + +print "Total elapsed time: {$stopwatch->totalElapsedTime()->inSecondsPrecise()}s\n"; diff --git a/examples/topics/types/csv/csv_read_partitioned_filter.php b/examples/topics/types/csv/csv_read_partitioned_filter.php new file mode 100644 index 000000000..1242919d8 --- /dev/null +++ b/examples/topics/types/csv/csv_read_partitioned_filter.php @@ -0,0 +1,37 @@ +read(CSV::from(__FLOW_DATA__ . '/partitioned')) + ->withEntry('unpacked', ref('row')->unpack()) + ->renameAll('unpacked.', '') + ->drop('row') + ->collect() + ->filterPartitions(Partitions::only('t_shirt_color', 'green')) + ->sortBy(ref('id')) + ->write(To::output()); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$stopwatch = new Stopwatch(); +$stopwatch->start(); + +print "Reading partitioned CSV dataset with partition filtering...\n"; + +$flow->run(); + +$stopwatch->stop(); + +print "Total elapsed time: {$stopwatch->totalElapsedTime()->inSecondsPrecise()}s\n"; diff --git a/examples/topics/types/csv/csv_to_avro.php b/examples/topics/types/csv/csv_to_avro.php index 0bc163cdd..8cc46dfdd 100644 --- a/examples/topics/types/csv/csv_to_avro.php +++ b/examples/topics/types/csv/csv_to_avro.php @@ -11,25 +11,29 @@ require __DIR__ . '/../../../bootstrap.php'; +$flow = (new Flow()) + ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 10_000)) + ->withEntry('unpacked', ref('row')->unpack()) + ->renameAll('unpacked.', '') + ->drop(col('row')) + ->rename('last name', 'last_name') + ->write(Avro::to(__FLOW_OUTPUT__ . '/dataset.avro')); + +if ('' !== \Phar::running(false)) { + return $flow; +} + $csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024); print "Converting CSV {$csvFileSize}Mb file into avro...\n"; $stopwatch = new Stopwatch(); $stopwatch->start(); -$total = 0; -(new Flow()) - ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 10_000)) - ->withEntry('unpacked', ref('row')->unpack()) - ->renameAll('unpacked.', '') - ->drop(col('row')) - ->rename('last name', 'last_name') - ->write(Avro::to(__FLOW_OUTPUT__ . '/dataset.avro')) - ->run(); +$flow->run(); $stopwatch->stop(); print "Total elapsed time: {$stopwatch->totalElapsedTime()->inSecondsPrecise()}s\n\n"; -$parquetFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.avro') / 1024 / 1024); -print "Output avro file size {$parquetFileSize}Mb\n"; +$arvoFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.avro') / 1024 / 1024); +print "Output avro file size {$arvoFileSize}Mb\n"; diff --git a/examples/topics/types/csv/csv_to_json.php b/examples/topics/types/csv/csv_to_json.php index d8414ea5e..3c0da7669 100644 --- a/examples/topics/types/csv/csv_to_json.php +++ b/examples/topics/types/csv/csv_to_json.php @@ -12,9 +12,15 @@ \unlink(__FLOW_OUTPUT__ . '/dataset.json'); } -return (new Flow()) +$flow = (new Flow()) ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv')) ->withEntry('unpacked', ref('row')->unpack()) ->renameAll('unpacked.', '') ->drop(col('row')) ->write(Json::to(__FLOW_OUTPUT__ . '/dataset.json')); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$flow->run(); diff --git a/examples/topics/types/csv/csv_to_parquet_100k.php b/examples/topics/types/csv/csv_to_parquet_100k.php index 501db02ed..716728e31 100644 --- a/examples/topics/types/csv/csv_to_parquet_100k.php +++ b/examples/topics/types/csv/csv_to_parquet_100k.php @@ -11,20 +11,24 @@ require __DIR__ . '/../../../bootstrap.php'; +$flow = (new Flow()) + ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 10_000)) + ->withEntry('unpacked', ref('row')->unpack()) + ->renameAll('unpacked.', '') + ->drop(col('row')) + ->write(Parquet::to(__FLOW_OUTPUT__ . '/dataset_100k.parquet', 100_000)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + $csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024); print "Converting CSV {$csvFileSize}Mb file into parquet...\n"; $stopwatch = new Stopwatch(); $stopwatch->start(); -$total = 0; -(new Flow()) - ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 10_000)) - ->withEntry('unpacked', ref('row')->unpack()) - ->renameAll('unpacked.', '') - ->drop(col('row')) - ->write(Parquet::to(__FLOW_OUTPUT__ . '/dataset_100k.parquet', 100_000)) - ->run(); +$flow->run(); $stopwatch->stop(); diff --git a/examples/topics/types/csv/csv_to_parquet_10k.php b/examples/topics/types/csv/csv_to_parquet_10k.php index 648f4ca1c..ae0b69032 100644 --- a/examples/topics/types/csv/csv_to_parquet_10k.php +++ b/examples/topics/types/csv/csv_to_parquet_10k.php @@ -11,20 +11,24 @@ require __DIR__ . '/../../../bootstrap.php'; +$flow = (new Flow()) + ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 10_000)) + ->withEntry('unpacked', ref('row')->unpack()) + ->renameAll('unpacked.', '') + ->drop(col('row')) + ->write(Parquet::to(__FLOW_OUTPUT__ . '/dataset_10k.parquet', 10_000)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + $csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024); print "Converting CSV {$csvFileSize}Mb file into parquet...\n"; $stopwatch = new Stopwatch(); $stopwatch->start(); -$total = 0; -(new Flow()) - ->read(CSV::from(__FLOW_OUTPUT__ . '/dataset.csv', 10_000)) - ->withEntry('unpacked', ref('row')->unpack()) - ->renameAll('unpacked.', '') - ->drop(col('row')) - ->write(Parquet::to(__FLOW_OUTPUT__ . '/dataset_10k.parquet', 10_000)) - ->run(); +$flow->run(); $stopwatch->stop(); diff --git a/examples/topics/types/csv/php_to_csv.php b/examples/topics/types/csv/php_to_csv.php index 89b166a3d..0f8a52d79 100644 --- a/examples/topics/types/csv/php_to_csv.php +++ b/examples/topics/types/csv/php_to_csv.php @@ -13,19 +13,23 @@ $extractor = require __FLOW_DATA__ . '/extractor.php'; +$flow = (new Flow()) + ->read($extractor) + ->withEntry('unpacked', ref('row')->unpack()) + ->renameAll('unpacked.', '') + ->drop(col('row')) + ->write(CSV::to(__FLOW_OUTPUT__ . '/dataset.csv')); + +if ('' !== \Phar::running(false)) { + return $flow; +} + $stopwatch = new Stopwatch(); $stopwatch->start(); -$total = 0; $memory = new Consumption(); $memory->current(); -(new Flow()) - ->read($extractor) - ->withEntry('unpacked', ref('row')->unpack()) - ->renameAll('unpacked.', '') - ->drop(col('row')) - ->write(CSV::to(__FLOW_OUTPUT__ . '/dataset.csv')) - ->run(); +$flow->run(); $memory->current(); $stopwatch->stop(); diff --git a/examples/topics/types/csv/php_to_csv_and_json.php b/examples/topics/types/csv/php_to_csv_and_json.php index 6a6ef02a3..9617aa63a 100644 --- a/examples/topics/types/csv/php_to_csv_and_json.php +++ b/examples/topics/types/csv/php_to_csv_and_json.php @@ -14,20 +14,24 @@ $extractor = require __FLOW_DATA__ . '/extractor.php'; -$stopwatch = new Stopwatch(); -$stopwatch->start(); -$total = 0; -$memory = new Consumption(); -$memory->current(); - -(new Flow()) +$flow = (new Flow()) ->read($extractor) ->withEntry('unpacked', ref('row')->unpack()) ->renameAll('unpacked.', '') ->drop(col('row')) ->write(CSV::to(__FLOW_OUTPUT__ . '/dataset.csv')) - ->write(Json::to(__FLOW_OUTPUT__ . '/dataset.json')) - ->run(); + ->write(Json::to(__FLOW_OUTPUT__ . '/dataset.json')); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$stopwatch = new Stopwatch(); +$stopwatch->start(); +$memory = new Consumption(); +$memory->current(); + +$flow->run(); $memory->current(); $stopwatch->stop(); diff --git a/examples/topics/types/csv/php_to_csv_partition.php b/examples/topics/types/csv/php_to_csv_partition.php index e55dac48c..e44938302 100644 --- a/examples/topics/types/csv/php_to_csv_partition.php +++ b/examples/topics/types/csv/php_to_csv_partition.php @@ -13,19 +13,23 @@ $extractor = require __FLOW_DATA__ . '/extractor.php'; -$stopwatch = new Stopwatch(); -$stopwatch->start(); -$total = 0; - -(new Flow()) +$flow = (new Flow()) ->read($extractor) ->withEntry('unpacked', ref('row')->unpack()) ->renameAll('unpacked.', '') ->drop(col('row')) ->mode(SaveMode::Overwrite) ->partitionBy('country_code', 't_shirt_color') - ->write(CSV::to(__FLOW_OUTPUT__ . '/partitioned')) - ->run(); + ->write(CSV::to(__FLOW_OUTPUT__ . '/partitioned')); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +$stopwatch = new Stopwatch(); +$stopwatch->start(); + +$flow->run(); $stopwatch->stop(); diff --git a/examples/topics/types/xml/reading.php b/examples/topics/types/xml/reading.php index b93ce3021..4a9a5b6a6 100644 --- a/examples/topics/types/xml/reading.php +++ b/examples/topics/types/xml/reading.php @@ -8,8 +8,14 @@ require __DIR__ . '/../../../bootstrap.php'; -print "Reading XML dataset...\n"; - -return (new Flow()) +$flow = (new Flow()) ->read(XML::from(__FLOW_DATA__ . '/simple_items.xml', 'root/items/item')) ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +print "Reading XML dataset...\n"; + +$flow->run(); diff --git a/examples/topics/types/xml/salaries.php b/examples/topics/types/xml/salaries.php index c8655e275..242f136a4 100644 --- a/examples/topics/types/xml/salaries.php +++ b/examples/topics/types/xml/salaries.php @@ -10,9 +10,7 @@ require __DIR__ . '/../../../bootstrap.php'; -print "Reading XML dataset...\n"; - -return (new Flow()) +$flow = (new Flow()) ->read(XML::from(__FLOW_DATA__ . '/salaries.xml')) ->withEntry('months', ref('row')->xpath('/Salaries/Month')) ->withEntry('month', ref('months')->expand()) @@ -26,3 +24,11 @@ ->aggregate(Aggregation::sum(ref('department_salary'))) ->rename('department_salary_sum', 'total_monthly_salaries') ->write(To::output(false)); + +if ('' !== \Phar::running(false)) { + return $flow; +} + +print "Reading XML dataset...\n"; + +$flow->run();