Skip to content

Commit

Permalink
improve co-routine documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jillesvangurp committed Dec 8, 2019
1 parent 19f3b72 commit 6bbc5de
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 26 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ The Java client is designed for Java users and comes with a lot of things that a
This client cuts down on the boiler plate and uses Kotlin's DSL features, extension functions, etc. to layer a
friendly API over the underlying client functionality.

Kotlin also has support for co-routines and we use this to make using the asynchronous methods in the Java client a
lot nicer to use. Basics for this are in place but Kotlin's co-routine support is still evolving and some of the things we
use are still labeled experimental.
Kotlin also has support for **co-routines** and we use this to make using the asynchronous methods in the Java client a
lot nicer to use. To do so, we generate kotlin coroutine friendly versions of nearly all asynchronous methods
using a code generator. The generated code uses `suspendCancellableCoroutine` to create suspend functions
that do the right thing if something happens to the coroutine context.

Finally, the despite being a high level client, the official client lacks a few abstractions that result in a lot of
things being left as an exercise to the user. This library provides some helpful abstractions to deal with the common
Expand Down
61 changes: 51 additions & 10 deletions manual/coroutines.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ The RestHighLevelClient exposes asynchronous versions of most APIs that take a c
the response when it comes back. Using this is kind of boiler plate heavy.

Luckily, Kotlin has co-routines for asynchronous programming and this library provides co-routine
friendly versions of these functions. They each work pretty much the same way as their synchronous
version except they are marked as suspend and use a `SuspendingActionListener` that uses Kotlin's
`suspendCancellableCoroutine` to wrap the callback that the rest high level client expects.
friendly versions of these functions. These `suspend` functions work pretty much the same way as their
synchronous version except they are marked as suspend and use a `SuspendingActionListener` that uses
Kotlin's `suspendCancellableCoroutine` to wrap the callback that the rest high level client expects.

If you use an asynchronous server framework such as ktor or Spring Boot (in reactive mode), you'll
want to use these.
As of Elasticsearch 7.5.0, all asynchronous calls return a `Cancellable` object that allows you to cancel
the task. Using `suspendCancellableCoRoutine` uses this and this means that if you have some failure
or abort a coroutine scope, all the running tasks are cancelled.

If you use an asynchronous server framework such as Ktor or Spring Boot 2.x (in reactive mode), you'll
want to use the asynchronous functions.

To support co-routines, this project is using a
[code generation plugin](https://github.com/jillesvangurp/es-kotlin-codegen-plugin)
Expand All @@ -26,6 +30,7 @@ As an example, here are three ways to use the reloadAnalyzers API:
// the synchronous version as provided by the RestHighLevel client
val indicesClient = esClient.indices()
val response = indicesClient.reloadAnalyzers(ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT)

// the asynchronous version with a callback as provided by the RestHighLevel client
indicesClient.reloadAnalyzersAsync(ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT, object : ActionListener<ReloadAnalyzersResponse> {
override fun onFailure(e: Exception) {
Expand All @@ -38,7 +43,7 @@ indicesClient.reloadAnalyzersAsync(ReloadAnalyzersRequest("myindex"), RequestOpt
})

runBlocking {
// the coroutine friendly version generated by the code generator plugin
// the coroutine friendly version using a function generated by the code generator plugin
// this is a suspend version so we put it in a runBlocking to get a coroutine scope
// use a more appropriate scope in your own application of course.
val response = indicesClient.reloadAnalyzersAsync(ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT)
Expand All @@ -49,12 +54,13 @@ This works the same for all the async functions in the Java client.

## IndexDAO Async

Of course, [`IndexDAO`](https://github.com/jillesvangurp/es-kotlin-wrapper-client/tree/master/src/main/kotlin/io/inbot/eskotlinwrapper/IndexDAO.kt) has async functions as well and using that is
Of course, [`IndexDAO`](https://github.com/jillesvangurp/es-kotlin-wrapper-client/tree/master/src/main/kotlin/io/inbot/eskotlinwrapper/IndexDAO.kt) has async functions as well and using that works
exactly the same as the synchronous version.

```kotlin
runBlocking {
// simply use async versions the same way as you would use the regular versions
// it will suspend at the appropriate moments
thingDao.bulkAsync(refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE) {
1.rangeTo(50).forEach {
index("$it",Thing("document $it"))
Expand All @@ -72,11 +78,46 @@ We indexed a lot of things asynchronously : 50
```

Another useful application might be doing several searches asynchronously and then combining their results.

```kotlin
runBlocking {
// asynchronously do a few searches

val combinedTotals = 0.until(5).map {
async {
thingDao.searchAsync {
source {
from(it*5)
size(5)
query(QueryBuilders.matchPhrasePrefixQuery("title","docu"))
}
}
}
}.map {
// get each result and extract the number of hits
it.await().mappedHits.count()
}.reduce { l, r->l+r}

println("We fetched $combinedTotals Things")
}
```

Output:

```
We fetched 25 Things
```

## Development status

Co-routine support is a work in progress in this library and there may be more changes
related to this in future versions. E.g. `Flow` seems like it could be useful when dealing
with scrolling searches.
Co-routine support is still somewhat experimental in this library and there may be more changes
related to this in future versions as our code generator evolves. E.g. `Flow` seems like it
could be useful when dealing with (scrolling) searches. Another topic of attention is using multiple
threads for asynchronous bulk indexing to improve throughput.

However, the generated async functions should be stable to use as is and are useful right now.


---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package io.inbot.eskotlinwrapper.manual

import io.inbot.eskotlinwrapper.AbstractElasticSearchTest
import io.inbot.eskotlinwrapper.IndexDAO
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.search.source
import org.elasticsearch.action.support.WriteRequest
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.crudDao
import org.elasticsearch.client.indices.ReloadAnalyzersRequest
import org.elasticsearch.client.indices.ReloadAnalyzersResponse
import org.elasticsearch.client.reloadAnalyzersAsync
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.query.QueryBuilders
import org.junit.jupiter.api.Test

class CoRoutinesManualTest: AbstractElasticSearchTest(indexPrefix = "manual") {
Expand Down Expand Up @@ -51,12 +54,16 @@ class CoRoutinesManualTest: AbstractElasticSearchTest(indexPrefix = "manual") {
the response when it comes back. Using this is kind of boiler plate heavy.
Luckily, Kotlin has co-routines for asynchronous programming and this library provides co-routine
friendly versions of these functions. They each work pretty much the same way as their synchronous
version except they are marked as suspend and use a `SuspendingActionListener` that uses Kotlin's
`suspendCancellableCoroutine` to wrap the callback that the rest high level client expects.
friendly versions of these functions. These `suspend` functions work pretty much the same way as their
synchronous version except they are marked as suspend and use a `SuspendingActionListener` that uses
Kotlin's `suspendCancellableCoroutine` to wrap the callback that the rest high level client expects.
If you use an asynchronous server framework such as ktor or Spring Boot (in reactive mode), you'll
want to use these.
As of Elasticsearch 7.5.0, all asynchronous calls return a `Cancellable` object that allows you to cancel
the task. Using `suspendCancellableCoRoutine` uses this and this means that if you have some failure
or abort a coroutine scope, all the running tasks are cancelled.
If you use an asynchronous server framework such as Ktor or Spring Boot 2.x (in reactive mode), you'll
want to use the asynchronous functions.
To support co-routines, this project is using a
[code generation plugin](https://github.com/jillesvangurp/es-kotlin-codegen-plugin)
Expand All @@ -71,6 +78,7 @@ class CoRoutinesManualTest: AbstractElasticSearchTest(indexPrefix = "manual") {
// the synchronous version as provided by the RestHighLevel client
val indicesClient = esClient.indices()
val response = indicesClient.reloadAnalyzers(ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT)

// the asynchronous version with a callback as provided by the RestHighLevel client
indicesClient.reloadAnalyzersAsync(ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT, object : ActionListener<ReloadAnalyzersResponse> {
override fun onFailure(e: Exception) {
Expand All @@ -83,7 +91,7 @@ class CoRoutinesManualTest: AbstractElasticSearchTest(indexPrefix = "manual") {
})

runBlocking {
// the coroutine friendly version generated by the code generator plugin
// the coroutine friendly version using a function generated by the code generator plugin
// this is a suspend version so we put it in a runBlocking to get a coroutine scope
// use a more appropriate scope in your own application of course.
val response = indicesClient.reloadAnalyzersAsync(ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT)
Expand All @@ -95,13 +103,14 @@ class CoRoutinesManualTest: AbstractElasticSearchTest(indexPrefix = "manual") {
## IndexDAO Async
Of course, ${mdLink(IndexDAO::class)} has async functions as well and using that is
Of course, ${mdLink(IndexDAO::class)} has async functions as well and using that works
exactly the same as the synchronous version.
"""

blockWithOutput {
runBlocking {
// simply use async versions the same way as you would use the regular versions
// it will suspend at the appropriate moments
thingDao.bulkAsync(refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE) {
1.rangeTo(50).forEach {
index("$it",Thing("document $it"))
Expand All @@ -112,12 +121,42 @@ class CoRoutinesManualTest: AbstractElasticSearchTest(indexPrefix = "manual") {
}
}

+"""
Another useful application might be doing several searches asynchronously and then combining their results.
"""

blockWithOutput {
runBlocking {
// asynchronously do a few searches

val combinedTotals = 0.until(5).map {
async {
thingDao.searchAsync {
source {
from(it*5)
size(5)
query(QueryBuilders.matchPhrasePrefixQuery("title","docu"))
}
}
}
}.map {
// get each result and extract the number of hits
it.await().mappedHits.count()
}.reduce { l, r->l+r}

println("We fetched $combinedTotals Things")
}
}

+"""
## Development status
Co-routine support is a work in progress in this library and there may be more changes
related to this in future versions. E.g. `Flow` seems like it could be useful when dealing
with scrolling searches.
Co-routine support is still somewhat experimental in this library and there may be more changes
related to this in future versions as our code generator evolves. E.g. `Flow` seems like it
could be useful when dealing with (scrolling) searches. Another topic of attention is using multiple
threads for asynchronous bulk indexing to improve throughput.
However, the generated async functions should be stable to use as is and are useful right now.
"""
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/test/kotlin/io/inbot/eskotlinwrapper/manual/ReadmeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class ReadmeTest : AbstractElasticSearchTest(indexPrefix = "manual") {
This client cuts down on the boiler plate and uses Kotlin's DSL features, extension functions, etc. to layer a
friendly API over the underlying client functionality.
Kotlin also has support for co-routines and we use this to make using the asynchronous methods in the Java client a
lot nicer to use. Basics for this are in place but Kotlin's co-routine support is still evolving and some of the things we
use are still labeled experimental.
Kotlin also has support for **co-routines** and we use this to make using the asynchronous methods in the Java client a
lot nicer to use. To do so, we generate kotlin coroutine friendly versions of nearly all asynchronous methods
using a code generator. The generated code uses `suspendCancellableCoroutine` to create suspend functions
that do the right thing if something happens to the coroutine context.
Finally, the despite being a high level client, the official client lacks a few abstractions that result in a lot of
things being left as an exercise to the user. This library provides some helpful abstractions to deal with the common
Expand Down

0 comments on commit 6bbc5de

Please sign in to comment.