Skip to content

Commit

Permalink
Merge pull request #31 from yarosman/master
Browse files Browse the repository at this point in the history
Netty 4.1 support
  • Loading branch information
sergkh authored Feb 7, 2018
2 parents 101c800 + 0316e62 commit 8c9d999
Show file tree
Hide file tree
Showing 30 changed files with 1,369 additions and 581 deletions.
9 changes: 0 additions & 9 deletions .bintray.sh

This file was deleted.

4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: scala
scala:
- 2.11.8
- 2.12.4
jdk:
- oraclejdk8
env:
Expand Down Expand Up @@ -41,4 +41,4 @@ script:
- sbt clean test +package

after_success:
- test "${TRAVIS_PULL_REQUEST}" = 'false' && test "${TRAVIS_TAG}" != '' && sh "$TRAVIS_BUILD_DIR/.bintray.sh" && sbt +publish bintrayRelease
- test "${TRAVIS_PULL_REQUEST}" = 'false' && test "${TRAVIS_TAG}" != '' && sbt +publish
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* protocol pipelining
* connection multiplexing
* in-memory redis implementation which can come in handy for testing (in active development)
* scala 2.12 support
* scala 2.11, 2.12 support

Changes from [original version](https://github.com/andreyk0/redis-client-scala-netty):
* added support of [scripting](http://redis.io/commands#scripting) commands
Expand All @@ -16,6 +16,7 @@ Changes from [original version](https://github.com/andreyk0/redis-client-scala-n
* moved to sbt
* in-memory client
* Distributed lock implementation ([Redlock](http://redis.io/topics/distlock))
* Netty 4.1.x

## Building
$ sbt package
Expand Down
16 changes: 9 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ organization := "com.impactua"

version := sys.env.getOrElse("TRAVIS_TAG", "1.4.0-SNAPSHOT")

scalaVersion := "2.11.8"
scalaVersion := "2.12.4"

crossScalaVersions := Seq("2.10.4", "2.11.8", "2.12.0")
crossScalaVersions := Seq("2.11.8", "2.12.0")

licenses += ("MIT", url("http://opensource.org/licenses/MIT"))

Expand All @@ -15,15 +15,17 @@ publishArtifact := true
publishArtifact in Test := false

bintrayReleaseOnPublish := false

bintrayPackage := name.value

bintrayOrganization in bintray := Some("sergkh")
bintrayPackageLabels := Seq("scala", "redis", "netty")

concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)

val nettyVersion = "4.1.20.Final"

libraryDependencies ++= Seq(
"io.netty" % "netty" % "3.10.6.Final",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"com.storm-enroute" %% "scalameter" % "0.8.2" % "test"
"io.netty" % "netty-handler" % nettyVersion,
"io.netty" % "netty-transport-native-epoll" % nettyVersion classifier "linux-x86_64",
"org.scalatest" %% "scalatest" % "3.0.4" % Test,
"com.storm-enroute" %% "scalameter" % "0.8.2" % Test
)
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.13
sbt.version=1.1.0
3 changes: 1 addition & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.1")
5 changes: 5 additions & 0 deletions src/main/scala/com/fotolog/redis/Conversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ trait DefaultConverters {
def write(i: Long) = i.toString.getBytes(charset)
}

implicit object FloatConverter extends BinaryConverter[Float] {
def read(b: Array[Byte]) = new String(b).toFloat
def write(i: Float) = i.toString.getBytes(charset)
}

implicit object DoubleConverter extends BinaryConverter[Double] {
def read(b: Array[Byte]) = new String(b).toDouble
def write(i: Double) = i.toString.getBytes(charset)
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/fotolog/redis/RedisClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.net.URI
import java.util.concurrent.TimeUnit

import com.fotolog.redis.commands._
import com.fotolog.redis.connections.{InMemoryRedisConnection, Netty3RedisConnection, RedisConnection}
import com.fotolog.redis.connections.{InMemoryRedisConnection, Netty4RedisConnection, RedisConnection}

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
Expand All @@ -27,7 +27,8 @@ object RedisClient {
Option(redisUri.getScheme) match {
case Some("redis") | None =>
val port = if (redisUri.getPort > 0) redisUri.getPort else 6379
val client = new RedisClient(new Netty3RedisConnection(redisUri.getHost, port), timeout)

val client = new RedisClient(new Netty4RedisConnection(redisUri.getHost, port), timeout)

for (userInfo <- Option(redisUri.getUserInfo)) {
val password = userInfo.stripPrefix(":")
Expand Down Expand Up @@ -58,7 +59,7 @@ object RedisClient {
class RedisClient(val r: RedisConnection, val timeout: Duration) extends GenericCommands with StringCommands
with HashCommands with ListCommands
with SetCommands with ScriptingCommands with PubSubCommands
with HyperLogLogCommands {
with HyperLogLogCommands with SortedSetCommands {

def isConnected: Boolean = r.isOpen
def shutdown() { r.shutdown() }
Expand Down
51 changes: 51 additions & 0 deletions src/main/scala/com/fotolog/redis/RedisMessage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.fotolog.redis

import com.fotolog.redis.connections.BulkDataResult
import io.netty.util.CharsetUtil

/**
* @author Yaroslav Derman <[email protected]>.
* created on 12.03.2017.
*/
sealed trait RedisMessage

trait SimpleMessage extends RedisMessage {
def asOptBin: Option[Array[Byte]] = None
}

trait ComplexMessage extends RedisMessage

case object NullRedisMessage extends SimpleMessage

case object EmptyArrayRedisMessage extends SimpleMessage

case class StringRedisMessage(content: String) extends SimpleMessage {
override def asOptBin: Option[Array[Byte]] = Option(content).map(_.getBytes(CharsetUtil.UTF_8))
}

case class RawRedisMessage(content: Array[Byte]) extends SimpleMessage {
override def toString: String = new String(content)

override def asOptBin: Option[Array[Byte]] = Some(content)
}

case class IntRedisMessage(number: Int) extends SimpleMessage {
override def asOptBin: Option[Array[Byte]] = Some(number.toString.getBytes)
}

case class ErrorRedisMessage(error: String) extends SimpleMessage {
override def asOptBin: Option[Array[Byte]] = Option(error).map(_.getBytes(CharsetUtil.UTF_8))
}

case class ArrayHeaderRedisMessage(length: Int) extends SimpleMessage {
override def asOptBin: Option[Array[Byte]] = None
}

case class ArrayRedisMessage(children: List[RedisMessage]) extends ComplexMessage {

//TODO: fixme
def asBulk: Seq[BulkDataResult] = children.flatMap {
case s: SimpleMessage => Seq(BulkDataResult(s.asOptBin))
case c: ArrayRedisMessage => c.asBulk
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.fotolog.redis.codecs

import io.netty.channel.ChannelHandlerContext

/**
* @author Yaroslav Derman <[email protected]>.
* created on 02.03.2017.
*/
private[codecs] trait ChannelExceptionHandler {
def handleException(ctx: ChannelHandlerContext, ex: Throwable) {
ctx.close()
}
}
72 changes: 72 additions & 0 deletions src/main/scala/com/fotolog/redis/codecs/RedisArrayAgregator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.fotolog.redis.codecs

import java.util


import com.fotolog.redis.codecs.RedisArrayAgregator.AggregateState
import com.fotolog.redis._
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.MessageToMessageDecoder

import scala.collection.mutable.ArrayBuffer

/**
* @author Yaroslav Derman <[email protected]>.
* created on 13.03.2017.
*/
class RedisArrayAgregator extends MessageToMessageDecoder[RedisMessage] {

val queue: util.Deque[AggregateState] = new util.ArrayDeque[AggregateState]

override def decode(ctx: ChannelHandlerContext, msg: RedisMessage, out: util.List[AnyRef]): Unit = {
msg match {
case header: ArrayHeaderRedisMessage if header.length == 0 =>
out.add(EmptyArrayRedisMessage)

case header: ArrayHeaderRedisMessage if header.length == -1 =>
out.add(NullRedisMessage)

case ArrayHeaderRedisMessage(length) if queue.isEmpty =>
queue.push(new AggregateState(length))

case ArrayHeaderRedisMessage(length) if !queue.isEmpty =>
queue.push(new AggregateState(length))

case proxyMsg if queue.isEmpty =>
out.add(proxyMsg)

case partMsg if !queue.isEmpty =>

var promMsg: RedisMessage = partMsg

while (!queue.isEmpty) {
val current = queue.peekFirst()
current.add(promMsg)

if (current.isFull) {
promMsg = ArrayRedisMessage(current.msgs.toList)
queue.pop()
} else {
promMsg = null
return
}
}

Option(promMsg).foreach(msg => out.add(msg))
}
}

}

object RedisArrayAgregator {

case class AggregateState(length: Int, msgs: ArrayBuffer[RedisMessage]) {

def isFull: Boolean = length == msgs.size

def this(length: Int) = this(length, new ArrayBuffer[RedisMessage](length))

def add(msg: RedisMessage): Unit = msgs.append(msg)
}

}
35 changes: 35 additions & 0 deletions src/main/scala/com/fotolog/redis/codecs/RedisCommandEncoder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.fotolog.redis.codecs

import com.fotolog.redis.connections.ResultFuture
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.MessageToByteEncoder

/**
* @author Yaroslav Derman <[email protected]>.
* created on 02.03.2017.
*/
@Sharable
private[redis] class RedisCommandEncoder extends MessageToByteEncoder[ResultFuture] {

import com.fotolog.redis.connections.Cmd._

override def encode(ctx: ChannelHandlerContext, msg: ResultFuture, out: ByteBuf): Unit = {
binaryCmd(msg.cmd.asBin, out)
}

private def binaryCmd(cmdParts: Seq[Array[Byte]], out: ByteBuf) = {
out.writeBytes(ARRAY_START)
out.writeBytes(cmdParts.length.toString.getBytes)
out.writeBytes(EOL)

for (p <- cmdParts) {
out.writeBytes(STRING_START)
out.writeBytes(p.length.toString.getBytes)
out.writeBytes(EOL)
out.writeBytes(p)
out.writeBytes(EOL)
}
}
}
93 changes: 93 additions & 0 deletions src/main/scala/com/fotolog/redis/codecs/RedisResponseDecoder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.fotolog.redis.codecs

import java.nio.charset.Charset
import java.util

import com.fotolog.redis._
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.util.{ByteProcessor, CharsetUtil}

/**
* @author Yaroslav Derman <[email protected]>.
* created on 02.03.2017.
*/
private[redis] class RedisResponseDecoder extends ByteToMessageDecoder with ChannelExceptionHandler {

val charset: Charset = CharsetUtil.UTF_8
var responseType: ResponseType = Unknown

//TODO: split into frame without '/r/n'
override def decode(ctx: ChannelHandlerContext, in: ByteBuf, out: util.List[AnyRef]): Unit = {
responseType match {
case Unknown if in.isReadable =>
responseType = ResponseType(in.readByte)

case Unknown if !in.isReadable =>

case BulkData => readAsciiLine(in).foreach { line =>
line.toInt match {
case -1 =>
responseType = Unknown
out.add(NullRedisMessage)
case n =>
responseType = BinaryData(n)
}
}

case BinaryData(len) =>
if (in.readableBytes >= (len + 2)) {
// +2 for eol
responseType = Unknown
val bytes = new Array[Byte](len)
in.readBytes(bytes)
in.skipBytes(2)
out.add(RawRedisMessage(bytes))
}

case MultiBulkData =>
readAsciiLine(in).map { line =>
responseType = Unknown
out.add(ArrayHeaderRedisMessage(line.toInt))
}

case Integer =>
readAsciiLine(in).map { line =>
responseType = Unknown
out.add(IntRedisMessage(line.toInt))
}

case Error =>
readAsciiLine(in).map { line =>
responseType = Unknown
out.add(ErrorRedisMessage(line))
}

case SingleLine =>
readAsciiLine(in).map { line =>
responseType = Unknown
out.add(StringRedisMessage(line))
}
}
}

private def findEndOfLine(buffer: ByteBuf): Int = {
val i = buffer.forEachByte(ByteProcessor.FIND_LF)
if (i > 0 && buffer.getByte(i - 1) == '\r') i - 1 else -1
}

private def readAsciiLine(buf: ByteBuf): Option[String] = if (!buf.isReadable) None else {
findEndOfLine(buf) match {
case -1 => None
case n =>
val line = buf.toString(buf.readerIndex, n - buf.readerIndex, charset)
buf.skipBytes(line.length + 2)
Some(line)
}
}

override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
handleException(ctx, cause)
}
}
Loading

0 comments on commit 8c9d999

Please sign in to comment.