class UserDownloader(
private val api: NetworkService
) {
private val users = mutableListOf<User>()
fun downloaded(): List<User> = users.toList()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
users.add(newUser)
}
}
//1
import kotlinx.coroutines.*
class UserDownloader(
private val api: NetworkService
) {
private val users = mutableListOf<User>()
fun downloaded(): List<User> = users.toList()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
users += newUser
}
}
class User(val name: String)
interface NetworkService {
suspend fun fetchUser(id: Int): User
}
class FakeNetworkService : NetworkService {
override suspend fun fetchUser(id: Int): User {
delay(2)
return User("User$id")
}
}
suspend fun main() {
val downloader = UserDownloader(FakeNetworkService())
coroutineScope {
repeat(1_000_000) {
launch {
downloader.fetchUser(it)
}
}
}
print(downloader.downloaded().size) // ~998242
}
//2
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
//sampleStart
var counter = 0
fun main() = runBlocking {
massiveRun {
counter++
}
println(counter) // ~567231
}
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
//sampleEnd
//3
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
//sampleStart
var counter = 0
fun main() = runBlocking {
val lock = Any()
massiveRun {
synchronized(lock) { // We are blocking threads!
counter++
}
}
println("Counter = $counter") // 1000000
}
//sampleEnd
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
//4
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.atomic.AtomicInteger
//sampleStart
private var counter = AtomicInteger()
fun main() = runBlocking {
massiveRun {
counter.incrementAndGet()
}
println(counter.get()) // 1000000
}
//sampleEnd
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
//5
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.atomic.AtomicInteger
//sampleStart
private var counter = AtomicInteger()
fun main() = runBlocking {
massiveRun {
counter.set(counter.get() + 1)
}
println(counter.get()) // ~430467
}
//sampleEnd
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
//6
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicReference
//sampleStart
class UserDownloader(
private val api: NetworkService
) {
private val users = AtomicReference(listOf<User>())
fun downloaded(): List<User> = users.get()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
users.getAndUpdate { it + newUser }
}
}
//sampleEnd
class User(val name: String)
interface NetworkService {
suspend fun fetchUser(id: Int): User
}
class FakeNetworkService : NetworkService {
override suspend fun fetchUser(id: Int): User {
delay(2)
return User("User$id")
}
}
suspend fun main() {
val downloader = UserDownloader(FakeNetworkService())
coroutineScope {
repeat(1_000_000) {
launch {
downloader.fetchUser(it)
}
}
}
print(downloader.downloaded().size) // 1000000
}
//7
import kotlinx.coroutines.*
import java.util.concurrent.Executors
//sampleStart
val dispatcher = Dispatchers.IO
.limitedParallelism(1)
var counter = 0
fun main() = runBlocking {
massiveRun {
withContext(dispatcher) {
counter++
}
}
println(counter) // 1000000
}
//sampleEnd
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
//8
import kotlinx.coroutines.*
import java.util.concurrent.Executors
//sampleStart
class UserDownloader(
private val api: NetworkService
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO
.limitedParallelism(1)
suspend fun downloaded(): List<User> =
withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) = withContext(dispatcher) {
val newUser = api.fetchUser(id)
users += newUser
}
}
//sampleEnd
class User(val name: String)
interface NetworkService {
suspend fun fetchUser(id: Int): User
}
class FakeNetworkService : NetworkService {
override suspend fun fetchUser(id: Int): User {
delay(2)
return User("User$id")
}
}
suspend fun main() {
val downloader = UserDownloader(FakeNetworkService())
coroutineScope {
repeat(1_000_000) {
launch {
downloader.fetchUser(it)
}
}
}
print(downloader.downloaded().size) // ~1000000
}
//9
import kotlinx.coroutines.*
import java.util.concurrent.Executors
//sampleStart
class UserDownloader(
private val api: NetworkService
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO
.limitedParallelism(1)
suspend fun downloaded(): List<User> =
withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
withContext(dispatcher) {
users += newUser
}
}
}
//sampleEnd
class User(val name: String)
interface NetworkService {
suspend fun fetchUser(id: Int): User
}
class FakeNetworkService : NetworkService {
override suspend fun fetchUser(id: Int): User {
delay(2)
return User("User$id")
}
}
suspend fun main() {
val downloader = UserDownloader(FakeNetworkService())
coroutineScope {
repeat(1_000_000) {
launch {
downloader.fetchUser(it)
}
}
}
print(downloader.downloaded().size) // ~1000000
}
//10
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
//sampleStart
suspend fun main() = coroutineScope {
repeat(5) {
launch {
delayAndPrint()
}
}
}
val mutex = Mutex()
suspend fun delayAndPrint() {
mutex.lock()
delay(1000)
println("Done")
mutex.unlock()
}
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
//sampleEnd
//11
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
//sampleStart
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
massiveRun {
mutex.withLock {
counter++
}
}
println(counter) // 1000000
}
//sampleEnd
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
//12
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
//sampleStart
suspend fun main() {
val mutex = Mutex()
println("Started")
mutex.withLock {
mutex.withLock {
println("Will never be printed")
}
}
}
// Started
// (runs forever)
//sampleEnd
//13
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.system.measureTimeMillis
class MessagesRepository {
private val messages = mutableListOf<String>()
private val mutex = Mutex()
suspend fun add(message: String) = mutex.withLock {
delay(1000) // we simulate network call
messages.add(message)
}
}
suspend fun main() {
val repo = MessagesRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repo.add("Message$it")
}
}
}
}
println(timeMillis) // ~5120
}
//14
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
class MessagesRepository {
private val messages = mutableListOf<String>()
private val dispatcher = Dispatchers.IO
.limitedParallelism(1)
suspend fun add(message: String) =
withContext(dispatcher) {
delay(1000) // we simulate network call
messages.add(message)
}
}
suspend fun main() {
val repo = MessagesRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repo.add("Message$it")
}
}
}
}
println(timeMillis) // 1058
}
class MongoUserRepository(
//...
) : UserRepository {
private val mutex = Mutex()
override suspend fun updateUser(
userId: String,
userUpdate: UserUpdate
): Unit = mutex.withLock {
// Yes, update should happen on db,
// not via multiple functions,
// this is just an example.
val currentUser = getUser(userId) // Deadlock!
deleteUser(userId) // Deadlock!
addUser(currentUser.updated(userUpdate)) // Deadlock!
}
override suspend fun getUser(
userId: String
): User = mutex.withLock {
// ...
}
override suspend fun deleteUser(
userId: String
): Unit = mutex.withLock {
// ...
}
override suspend fun addUser(
user: User
): User = mutex.withLock {
// ...
}
}
//15
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
suspend fun main() = coroutineScope {
val semaphore = Semaphore(2)
repeat(5) {
launch {
semaphore.withPermit {
delay(1000)
print(it)
}
}
}
}
// 01
// (1 sec)
// 23
// (1 sec)
// 4
class LimitedNetworkUserRepository(
private val api: UserApi
) {
// We limit to 10 concurrent requests
private val semaphore = Semaphore(10)
suspend fun requestUser(userId: String) =
semaphore.withPermit {
api.requestUser(userId)
}
}