Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: hook to get ResultSet before calling next #42

Open
ahtokca opened this issue Mar 18, 2019 · 3 comments
Open

Feature Request: hook to get ResultSet before calling next #42

ahtokca opened this issue Mar 18, 2019 · 3 comments

Comments

@ahtokca
Copy link

ahtokca commented Mar 18, 2019

First things first - thanks for the amazing state of the art software.

I use it to stream large volume of data. It will be awesome to have a hook/callback called at the moment when ResultSet is just obtained. In particular I map ResultSet to CSV. It is all fine if result is not empty however I'd like return just a header (column names) when there the result is empty something like below.

public <T> Flowable<T> get(BiFunction<ResultSet, T> onResultSetIsReady, BiFunction<ResultSet, T> mapper) 

Or may be extends ResultSetMapper?

And use it something like below

sb.select(sql).get(::toHeaderLine, ::toCsvLine): Flowable<String>

Thanks one more time for the amazing software

@davidmoten
Copy link
Owner

davidmoten commented Mar 18, 2019

Thanks @ahtokca, glad it's useful.

I think the functionality you are looking for sits better outside the library. The thing to remember is that nothing stops you mapping the ResultSet to itself in the get method (example below). The method is there to encourage users to use the ResultSet as soon as possible so it is not exposed to out of order processing (because the same ResultSet object is returned for every row).

Flowable<String> lines = 
  Flowable.defer(() -> {
     boolean[] isFirst = new boolean[] {true};
     return sb.select(sql)
        .get(rs -> rs)  
        .map(rs -> {
            String s = (isFirst[0] ? toHeaderLine(rs)  + "\n":"") +  toCsv(rs));
            isFirst[0] = false;
            return s;
        });
  });

The defer creation method is required to make the Flowable reusable (isFirst state per subscription).

@ahtokca
Copy link
Author

ahtokca commented Mar 20, 2019

Hi @davidmoten, thanks for the quick answer. Indeed it is how I used it however for a corner case when query is empty it produce not header. So I come up with overriding Select

private static <T> Flowable<T> create(Connection con, String sql, Flowable<List<Object>> parameterGroups, int fetchSize,
                                          Function<? super ResultSet, T> mapper, boolean eagerDispose,
                                          Function<ResultSetMetaData, T> headerMapper) {
        log.debug("Select.create called with con={}", con);
        Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql);
        Function<NamedPreparedStatement, Flowable<T>> observableFactory = (ps) -> parameterGroups.flatMap((parameters) -> {
            Flowable<T> rows = create(ps.ps, parameters, mapper, ps.names, sql, fetchSize);
            if (headerMapper != null) {
                Flowable<T> header = Flowable.fromCallable(()->headerMapper.apply(ps.ps.getMetaData()));
                rows = Flowable.concat(header, rows);
            }
            return rows;
        }, true, 1);
        Consumer<NamedPreparedStatement> disposer = Util::closePreparedStatementAndConnection;
        return Flowable.using(initialState, observableFactory, disposer, eagerDispose);
    }

@davidmoten
Copy link
Owner

@ahtokca Good point. I hadn't considered the empty ResultSet case. Some API addition might be good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants