Skip to content

Commit

Permalink
Revert "Remove blocking inside SocketPool."
Browse files Browse the repository at this point in the history
This reverts commit d2e792a.
  • Loading branch information
JonathanLennox committed Oct 21, 2024
1 parent 6182860 commit 1f3cd1b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
55 changes: 44 additions & 11 deletions src/main/kotlin/org/ice4j/socket/SocketPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package org.ice4j.socket

import java.io.Closeable
import java.net.DatagramSocket
import java.net.DatagramSocketImpl
import java.net.SocketAddress
import java.nio.channels.DatagramChannel
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.Semaphore

/** A pool of datagram sockets all bound on the same port.
*
Expand Down Expand Up @@ -55,6 +56,8 @@ class SocketPool(
Runtime.getRuntime().availableProcessors()
}

private val semaphore = Semaphore(numSockets)

private val sockets = buildList {
val multipleSockets = numSockets > 1
var bindAddr = address
Expand All @@ -71,7 +74,7 @@ class SocketPool(
}
}

private val sockIndex = AtomicLong(0)
private val availableSockets = ArrayDeque(sockets)

/** The socket on which packets will be received. */
val receiveSocket: DatagramSocket
Expand All @@ -80,18 +83,48 @@ class SocketPool(
// sockets, spreading load?
get() = sockets.last()

/** Gets a socket on which packets can be sent, chosen from among all the available send sockets. */
val sendSocket: DatagramSocket
get() {
if (numSockets == 1) {
return sockets.first()
interface SocketHolder : Closeable {
val socket: DatagramSocket
}

/** Socket holder with autocloseable semantics, to ensure the socket is returned. */
private inner class MySocketHolder : SocketHolder {
override val socket = acquireSendSocket()
private var closed = false
override fun close() {
if (!closed) {
returnSendSocket(socket)
closed = true
}
return sockets[nextIndex()]
}
}

private fun nextIndex(): Int {
val nextIdx = sockIndex.getAndIncrement()
return nextIdx.rem(numSockets).toInt()
/** Trivial socket holder that gives out a single unique socket. */
private inner class TrivialSocketHolder : SocketHolder {
override val socket = sockets.first()
override fun close() {}
}

/** Gets a send socket holder. Should be used with Kotlin [use] or Java try-with-resources. May block. */
fun getSendSocket(): SocketHolder {
if (numSockets == 1) {
return TrivialSocketHolder()
}
return MySocketHolder()
}

private fun acquireSendSocket(): DatagramSocket {
semaphore.acquire()
synchronized(availableSockets) {
return availableSockets.removeFirst()
}
}

private fun returnSendSocket(socket: DatagramSocket) {
synchronized(availableSockets) {
availableSockets.addLast(socket)
}
semaphore.release()
}

fun close() {
Expand Down
26 changes: 14 additions & 12 deletions src/test/kotlin/org/ice4j/socket/SocketPoolTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,30 @@ class SocketPoolTest : ShouldSpec() {
context("Getting multiple send sockets from a pool") {
val numSockets = 4
val pool = SocketPool(loopbackAny, numSockets)
val sockets = mutableListOf<DatagramSocket>()
val holders = mutableListOf<SocketPool.SocketHolder>()
should("be possible") {
repeat(numSockets) {
sockets.add(pool.sendSocket)
holders.add(pool.getSendSocket())
}
}
// All sockets should be distinct
sockets.toSet().size shouldBe sockets.size
holders.forEach { it.close() }
pool.close()
}

context("Packets sent from each of the send sockets in the pool") {
val numSockets = 4
val pool = SocketPool(loopbackAny, numSockets)
val local = pool.receiveSocket.localSocketAddress
val sockets = mutableListOf<DatagramSocket>()
val holders = mutableListOf<SocketPool.SocketHolder>()
repeat(numSockets) {
sockets.add(pool.sendSocket)
holders.add(pool.getSendSocket())
}
sockets.forEachIndexed { i, it ->
holders.forEachIndexed { i, it ->
val buf = i.toString().toByteArray()
val packet = DatagramPacket(buf, buf.size, local)
it.send(packet)
it.socket.send(packet)
}
holders.forEach { it.close() }

should("be received") {
for (i in 0 until numSockets) {
Expand All @@ -70,7 +70,6 @@ class SocketPoolTest : ShouldSpec() {
packet.socketAddress shouldBe local
}
}
pool.close()
}

context("The number of send sockets") {
Expand All @@ -81,7 +80,9 @@ class SocketPoolTest : ShouldSpec() {

repeat(2 * numSockets) {
// This should cycle through all the available send sockets
sockets.add(pool.sendSocket)
pool.getSendSocket().use { holder ->
sockets.add(holder.socket)
}
}

should("be correct") {
Expand Down Expand Up @@ -124,8 +125,9 @@ class SocketPoolTest : ShouldSpec() {

private fun sendToSocket(count: Int) {
for (i in 0 until count) {
val socket = pool.sendSocket
socket.send(DatagramPacket(buf, BUFFER_SIZE, destAddr))
pool.getSendSocket().use {
it.socket.send(DatagramPacket(buf, BUFFER_SIZE, destAddr))
}
}
}

Expand Down

0 comments on commit 1f3cd1b

Please sign in to comment.