Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3315 Redis - add configuration for socket read timeout #3016

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/user/redis/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Parameter Type Description
``redis.connection.pool.size`` Integer Max number of simultaneous connections to use
``redis.connection.pool.validate`` Boolean Test connections when borrowed from the pool. Connections may be closed due to
inactivity, which would cause a transient error if validation is disabled
``redis.connection.timeout`` String Timeout for socket connections to Redis. The timeout is specified as a duration,
e.g. ``10 seconds``. The default value is ``2 seconds``
``redis.pipeline.enabled`` Boolean Enable pipelining of query requests. This reduces network latency, but restricts
queries to a single execution thread
``geomesa.query.threads`` Integer The number of threads to use per query (if not pipelining)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import redis.clients.jedis.{Jedis, JedisPool}

import java.awt.RenderingHints
import java.net.URI
import scala.util.Try
import scala.util.{Failure, Success, Try}

class RedisDataStoreFactory extends DataStoreFactorySpi with LazyLogging {

Expand Down Expand Up @@ -66,6 +66,7 @@ object RedisDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
RedisUrlParam,
RedisCatalogParam,
PoolSizeParam,
SocketTimeoutParam,
QueryThreadsParam,
QueryTimeoutParam,
PipelineParam,
Expand All @@ -89,20 +90,29 @@ object RedisDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
*/
def buildConnection(params: java.util.Map[String, _]): JedisPool = {
ConnectionPoolParam.lookupOpt(params).getOrElse {
val url = RedisUrlParam.lookup(params)
val uri = {
val url = RedisUrlParam.lookup(params)
// if there is no protocol/port, or the url is a valid redis url, use as is
// else use the redis:// protocol to support databases, etc
val parsed =
if (url.indexOf(":") == -1) {
Try(new URI(url))
} else {
parse(url).orElse(parse(s"redis://$url"))
}
parsed match {
case Success(uri) => uri
case Failure(e) =>
throw new IllegalArgumentException(s"Could not create valid Redis connection URI from: $url", e)
}
}

val config = new GenericObjectPoolConfig[Jedis]()
PoolSizeParam.lookupOpt(params).foreach(s => config.setMaxTotal(s.intValue()))
config.setTestOnBorrow(TestConnectionParam.lookup(params))
// if there is no protocol/port, or the url is a valid redis url, use as is
// else use the redis:// protocol to support databases, etc
if (url.indexOf(":") == -1) {
new JedisPool(config, url)
} else {
val uri = parse(url).orElse(parse(s"redis://$url")).getOrElse {
throw new IllegalArgumentException(s"Could not create valid Redis connection URI from: $url")
}
new JedisPool(config, uri)
}
val timeout = SocketTimeoutParam.lookup(params).toMillis.toInt

new JedisPool(config, uri, timeout)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemPropert
import org.locationtech.geomesa.utils.geotools.GeoMesaParam
import redis.clients.jedis.JedisPool

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

package object data extends LazyLogging {
Expand Down Expand Up @@ -55,6 +57,14 @@ package object data extends LazyLogging {
supportsNiFiExpressions = true
)

val SocketTimeoutParam =
new GeoMesaParam[Duration](
"redis.connection.timeout",
"Connection socket timeout for calls to Redis",
default = Duration(redis.clients.jedis.Protocol.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS),
supportsNiFiExpressions = true
)

val TestConnectionParam =
new GeoMesaParam[java.lang.Boolean](
"redis.connection.pool.validate",
Expand Down
Loading