Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider adding flatMapCompletionStage in Helidon Reactive Engine #2225

Closed
hicriss opened this issue Jul 31, 2020 · 9 comments · Fixed by #3339
Closed

Consider adding flatMapCompletionStage in Helidon Reactive Engine #2225

hicriss opened this issue Jul 31, 2020 · 9 comments · Fixed by #3339
Assignees
Labels
enhancement New feature or request P3 reactive Reactive streams and related components

Comments

@hicriss
Copy link

hicriss commented Jul 31, 2020

Detail Environment

  • Helidon Version: 2.0
  • Helidon SE

Problem Description

I wanna complete each operation of a transaction keeping the chain order in Helidon 2.0 SE. I saw in your Helidon MP documentation that it has the flatMapCompletionStage operator.

Example:

first step: Insert a record in employees table
Second step: Get their key
third step: Insert their address in another table
Each operation keeping the order

How can i do that in Helidon SE?

I also tried to work with flatMapSingle operator but it never ended or i got different result.

@hicriss hicriss changed the title flatMapCompletionStage in Helidon 2.0 SE Consider adding flatMapCompletionStage in Helidon 2.0 SE Aug 1, 2020
@hicriss hicriss changed the title Consider adding flatMapCompletionStage in Helidon 2.0 SE Consider adding flatMapCompletionStage in Helidon Reactive Engine Aug 2, 2020
@danielkec danielkec self-assigned this Aug 3, 2020
@danielkec
Copy link
Contributor

danielkec commented Aug 3, 2020

Hi thats great idea, I can offer a workaround with current api in the meantime. All its needed is limiting Multi.flatMap parallel executions to one like this:

        Multi.just(1, 2, 3, 4, 5)
                .flatMap(l -> Single.create(CompletableFuture.supplyAsync(() -> {
                    int sleepMillis = Math.abs(new Random().nextInt(500));
                    try {
                        Thread.sleep(sleepMillis);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return l + ": slept " + sleepMillis + " milliseconds";
                })), 1, false, 1)
                .forEach(l -> {
                    System.out.println(">>>" + l);
                }).await();

@danielkec danielkec added the reactive Reactive streams and related components label Aug 3, 2020
@hicriss
Copy link
Author

hicriss commented Aug 4, 2020

thank you so much Daniel!, however, that code only solves the order, Doesn't it? My second problem is when i want to insert records in a Master-Detail of three levels for example:

I have my customer table, order table and order-detail table. A customer has many orders and A order has many order-details (This is a simple example). So i tried to use a flatMap operator from Multi, all worked correctly in the second level, but when i saw my order-detail records were wrong. Its foreign keys were mixed or sometimes that never inserted the right detail. Note: I get my ids with returning id.

this is my code example:

        dbClient.inTransaction(dbTransaction -> {
            return dbTransaction
                    .createInsert(INSERT_CUSTOMER)
                    .params(paramsCustomer)
                    .execute()
                    .flatMapSingle(customerId -> Multi.create(customer.getOrders())
                            .flatMap(order -> {
                                Map<String, Object> paramsOrder = new HashMap<>();
                                paramsOrder.put("customerId", customerId);
                                paramsOrder.put("name", order.getName());
                                return dbTransaction
                                        .createInsert(INSERT_ORDER)
                                        .params(paramsOrder)
                                        .execute()
                                        .flatMapSingle(orderId -> Multi.create(order.getOrderDetails)
                                                .flatMap(orderDetail -> {
                                                    Map<String, Object> paramsOrderDetail = new HashMap<>();
                                                    paramsOrderDetail.put("orderId", orderId);
                                                    paramsOrderDetail.put("description", orderDetail.getDescription());
                                                    return dbTransaction
                                                            .createInsert(INSERT_ORDER_DETAIL)
                                                            .params(paramsOrderDetail)
                                                            .execute();
                                                }));
                            })
                    );
        });

I read flatMapCompletionStage wait for each item to be completed, I don't know if this case is correct. What am i doing wrong?

@danielkec
Copy link
Contributor

Hi, it may be left out intentionally, but it seems customerId is not used in paramsOrder?

@hicriss
Copy link
Author

hicriss commented Aug 4, 2020

Yes, i use my customerId in order and my orderId in order-details. I have my Map<String, Object> of params, my ids are taken in each level. But i didn't want to put it in my example. I updated my code example.

@barchetta barchetta added enhancement New feature or request P3 labels Aug 6, 2020
@danielkec
Copy link
Contributor

I think the problem is in misinterpreted return value from the insert statement, as stated in the docs Execution of DML statements will always return Single<Long> with the number of modified records in the database.
https://helidon.io/docs/v2/#/se/dbclient/01_introduction

So customerId is actually a number of affected rows and shouldn't be used as FK.

I would access id generating sequence directly, but let's ask @tomas-langer or @Tomas-Kraus what is the most convenient dbclient approach?

@hicriss
Copy link
Author

hicriss commented Aug 10, 2020

I did this example with Oracle DB, since not being able to obtain the ID of the inserted record using RETURNING, I had to do some things to return my id with DBClient and transactions. but, i got the same result, my fk got mixed or its fk wasn't correct.

This method is to insert and return my id from my inserted record

 protected <T> Single<Number> executeInsertStatement(DbTransaction dbExecute, String insertStatement, String getStatement, T values) {
        return dbExecute.createNamedInsert(insertStatement)
                .indexedParam(values)
                .execute()
                .flatMapSingle(aLong -> dbExecute.namedGet(getStatement)
                        .map(rowOptional -> rowOptional.map(dbRow ->
                                dbRow.column("ID").as(Number.class)).orElse(null)
                        ));
    }

and this is my method to insert my master-detail according to my last explain.

    public CompletionStage<Number> insert(PokemonType pokemonType) {
        return dbClient.inTransaction(dbExecute ->
                executeInsertStatement(dbExecute, "insert-type", "get-type-current-id", pokemonType)
                        .flatMap(typeId -> Multi.create(pokemonType.getPokemons())
                                .flatMap(pokemon -> {
                                    pokemon.setIdType(typeId);
                                    LOGGER.log(Level.INFO, "Pokemon: " + pokemon);

                                    return executeInsertStatement(dbExecute, "insert-pokemon", "get-pokemon-current-id", pokemon)
                                            .flatMap(pokemonId -> Multi.create(pokemon.getPokemonEvolutions())
                                                    .flatMap(pokemonEvolution -> {
                                                        pokemonEvolution.setIdPokemon(pokemonId);
                                                        LOGGER.log(Level.INFO, "Evolution: " + pokemonEvolution);

                                                        return executeInsertStatement(dbExecute, "insert-evolution", "get-evolution-current-id", pokemonEvolution);
                                                    }));
                                })
                                .collectList()
                                .map(numbers -> typeId))
                        .first());
    }

these are my statements

  statements:
    # Get id records created from database
    get-type-current-id: "SELECT MAX(ID) ID FROM POKEMON_TYPE"
    get-pokemon-current-id: "SELECT MAX(ID) ID FROM POKEMON"
    get-evolution-current-id: "SELECT MAX(ID) ID FROM POKEMON_EVOLUTION"
    # Insert records into database
    insert-type: "INSERT INTO POKEMON_TYPE(id, name) VALUES(?, ?)"
    insert-pokemon: "INSERT INTO POKEMON(id, name, id_type) VALUES(?, ?, ?)"
    insert-evolution: "INSERT INTO POKEMON_EVOLUTION (id, evolution, id_pokemon) VALUES(?, ?, ?)"

this is my data post

{
    "id": 1,
    "name": "Plant",
    "pokemons": [
        {
            "id": 1,
            "name": "Bulbasaur",
            "idType": null,
            "pokemonEvolutions": [
                {
                    "id": 1,
                    "evolution": "Bulbasaur",
                    "idPokemon": null
                },
                {
                    "id": 2,
                    "evolution": "Ivysaur",
                    "idPokemon": null
                },
                {
                    "id": 3,
                    "evolution": "Venusaur",
                    "idPokemon": null
                }
            ]
        },
        {
            "id": 2,
            "name": "Bellsprout",
            "idType": null,
            "pokemonEvolutions": [
                {
                    "id": 4,
                    "evolution": "Bellsprout",
                    "idPokemon": null
                },
                {
                    "id": 5,
                    "evolution": "Weepinbell",
                    "idPokemon": null
                },
                {
                    "id": 6,
                    "evolution": "Victreebel",
                    "idPokemon": null
                }
            ]
        },
        {
            "id": 3,
            "name": "Snivy",
            "idType": null,
            "pokemonEvolutions": [
                {
                    "id": 7,
                    "evolution": "Snivy",
                    "idPokemon": null
                },
                {
                    "id": 8,
                    "evolution": "Servine",
                    "idPokemon": null
                },
                {
                    "id": 9,
                    "evolution": "Serperior",
                    "idPokemon": null
                }
            ]
        }
    ]
}

This is my result in my DB, as you can see, my data post doesn't match with my data in the DB. Hope you can help me with this simple example.

TYPE_NAME ID NAME ID_POKEMON EVOLUTION
Plant 1 Bulbasaur
Plant 2 Bellsprout
Plant 3 Snivy 3 Serperior
Plant 3 Snivy 3 Bellsprout
Plant 3 Snivy 3 Venusaur
Plant 3 Snivy 3 Weepinbell
Plant 3 Snivy 3 Victreebel
Plant 3 Snivy 3 Ivysaur
Plant 3 Snivy 3 Servine
Plant 3 Snivy 3 Snivy
Plant 3 Snivy 3 Bulbasaur

@danielkec
Copy link
Contributor

@crispyCrowl Have you tried limiting prefetch and concurrency of the flatmap as suggested above?

.flatMap(l -> {db stuff}, 1, false, 1)

Now you are creating race condition between inserting and reading your max() value.

Also reconsider whole design with using aggregations for getting your ID as its quite uneffective. Calling your sequence directly would be much more effective. Just call select pok_seq.nextval from dual; before your insert used it and save it for fk's. That way you won't create read lock not mentioning how expensive aggregation function gets on larger data set.

@tomas-langer
Copy link
Member

Helidon DbClient currently does not support returning generated keys from the database.
Using the max(id) is very error prone as soon as you run in parallel, as you may get an id inserted by another thread.
There are database specific ways to handle this - for Oracle DB for example you can do
INSERT INTO table (column) VALUES (?) RETURNING id
and invoke this as a query - you would not get the number of updated values, but you would get the id as a query result.

(I am not trying to resolve the flatMap operations, just how to get the generated id)

@tomas-langer
Copy link
Member

Added #2279 for analysis of this feature (returning generated ids)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request P3 reactive Reactive streams and related components
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

4 participants