Skip to content
This repository has been archived by the owner on Oct 6, 2018. It is now read-only.

Fixes #36. Fixes #30. #40

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
56 changes: 27 additions & 29 deletions app/common/Util.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2014 Claude Mamo
* Some changes Copyright 2014 Isaac Banner | ibanner56
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -16,6 +17,7 @@

package common

import kafka.consumer.async.AsyncLowLevelConsumer
import play.api.Logger

import scala.concurrent.{Future, Promise}
Expand All @@ -24,9 +26,7 @@ import com.twitter.zk.{ZNode, ZkClient}
import common.Registry.PropertyConstants
import models.Zookeeper
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import org.apache.zookeeper.KeeperException.{NotEmptyException, NodeExistsException, NoNodeException}
import okapies.finagle.Kafka
import kafka.api.OffsetRequest
import org.apache.zookeeper.KeeperException.NoNodeException

object Util {

Expand Down Expand Up @@ -63,22 +63,15 @@ object Util {

def getPartitionsLogSize(topicName: String, partitionLeaders: Seq[String]): Future[Seq[Long]] = {
Logger.debug("Getting partition log sizes for topic " + topicName + " from partition leaders " + partitionLeaders.mkString(", "))
return for {
clients <- Future.sequence(partitionLeaders.map(addr => Future((addr, Kafka.newRichClient(addr)))))
partitionsLogSize <- Future.sequence(clients.zipWithIndex.map { tu =>
val addr = tu._1._1
val client = tu._1._2
var offset = Future(0L)

if (!addr.isEmpty) {
offset = twitterToScalaFuture(client.offset(topicName, tu._2, OffsetRequest.LatestTime)).map(_.offsets.head).recover {
case e => Logger.warn("Could not connect to partition leader " + addr + ". Error message: " + e.getMessage); 0L
}
}

client.close()
offset
return for {
clients <- Future.sequence(partitionLeaders.zipWithIndex.map {tuple =>
val hostAndPort = tuple._1.split(":")
val partition = tuple._2
AsyncLowLevelConsumer(topicName, partition, hostAndPort(0), hostAndPort(1).toInt)
})
partitionsLogSize <- Future.sequence(clients.map(client => client.offset))
closeClients <- Future.sequence(clients.map(client => client.close))
} yield partitionsLogSize
}

Expand Down Expand Up @@ -150,19 +143,24 @@ object Util {
}

def deleteZNode(zNode: ZNode): Future[ZNode] = {
val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d =>
twitterToScalaFuture(zNode.delete(d.stat.getVersion)).recover {
case e: NotEmptyException => {
for {
children <- getZChildren(zNode, Seq("*"))
delChildren <- Future.sequence(children.map(n => deleteZNode(n)))
} yield deleteZNode(zNode)
val deletePromise: Promise[ZNode] = Promise[ZNode]

getZChildren(zNode, Seq("*")).map({ children =>
val sequenceFuture = Future.sequence(children.map(n => deleteZNode(n)))

sequenceFuture.onSuccess({ case children =>
val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d =>
twitterToScalaFuture(zNode.delete(d.stat.getVersion))
}
case e: NoNodeException => Future(ZNode)
}
}

//TODO: investigate why actual type is Future[Object]
delNode.asInstanceOf[Future[ZNode]]
delNode.onComplete(zNode => deletePromise complete zNode)
})

sequenceFuture.onFailure({ case t =>
deletePromise failure t
})
})

deletePromise.future
}
}
93 changes: 93 additions & 0 deletions app/kafka/consumer/LowLevelConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Copyright (C) 2014 the original author or authors and Enernoc Inc.
* See the LICENCE.txt file distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
* Authors: Andrew Zafft | azafty468, Isaac Banner | ibanner56
*/

package kafka.consumer;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.HashMap;

public class LowLevelConsumer {
static final Logger log = LoggerFactory.getLogger(LowLevelConsumer.class);

private SimpleConsumer consumer;
private String leadBroker;
private String clientName;
private final String topic;
private final int partition;
private final int port;

public LowLevelConsumer(String topic, int partition, String seedBroker, int port) {
this.topic = topic;
this.partition = partition;
this.leadBroker = seedBroker;
this.port = port;
clientName = "Client_" + topic + "_" + partition;

consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName);
}

public long startingOffset() {
long offset;
try {
offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
} catch (Exception e) {
e.printStackTrace();
return 0L;
}
return offset;
}

public long endingOffset() {
long offset;
try {
offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
} catch (Exception e) {
e.printStackTrace();
return 0L;
}
return offset;
}

public void closeConsumers() {
if (consumer != null) consumer.close();
}

private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
log.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
}
45 changes: 45 additions & 0 deletions app/kafka/consumer/async/AsyncLowLevelConsumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (C) 2014 the original author or authors and Enernoc Inc.
* See the LICENCE.txt file distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
* Author: Isaac Banner | ibanner56
*/

package kafka.consumer.async

import kafka.consumer.LowLevelConsumer
import scala.concurrent.future
import scala.concurrent.Future
import play.api.libs.concurrent.Execution.Implicits.defaultContext

class AsyncLowLevelConsumer(consumer: LowLevelConsumer) {

def offset: Future[Long] = future {
consumer.endingOffset()
}

def close = future {
consumer.closeConsumers()
}

}

object AsyncLowLevelConsumer {
def apply(topic: String, partition: Int, seedBroker: String, port: Int) = future {
val llc: LowLevelConsumer = new LowLevelConsumer(topic, partition, seedBroker, port)
new AsyncLowLevelConsumer(llc)
}
}