diff --git a/app/common/Util.scala b/app/common/Util.scala index 7848ec7..18ab7fc 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -1,5 +1,6 @@ /* * Copyright 2014 Claude Mamo + * Some changes Copyright 2014 Isaac Banner | ibanner56 * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,6 +17,7 @@ package common +import kafka.consumer.async.AsyncLowLevelConsumer import play.api.Logger import scala.concurrent.{Future, Promise} @@ -24,9 +26,7 @@ import com.twitter.zk.{ZNode, ZkClient} import common.Registry.PropertyConstants import models.Zookeeper import play.api.libs.concurrent.Execution.Implicits.defaultContext -import org.apache.zookeeper.KeeperException.{NotEmptyException, NodeExistsException, NoNodeException} -import okapies.finagle.Kafka -import kafka.api.OffsetRequest +import org.apache.zookeeper.KeeperException.NoNodeException object Util { @@ -63,22 +63,15 @@ object Util { def getPartitionsLogSize(topicName: String, partitionLeaders: Seq[String]): Future[Seq[Long]] = { Logger.debug("Getting partition log sizes for topic " + topicName + " from partition leaders " + partitionLeaders.mkString(", ")) - return for { - clients <- Future.sequence(partitionLeaders.map(addr => Future((addr, Kafka.newRichClient(addr))))) - partitionsLogSize <- Future.sequence(clients.zipWithIndex.map { tu => - val addr = tu._1._1 - val client = tu._1._2 - var offset = Future(0L) - - if (!addr.isEmpty) { - offset = twitterToScalaFuture(client.offset(topicName, tu._2, OffsetRequest.LatestTime)).map(_.offsets.head).recover { - case e => Logger.warn("Could not connect to partition leader " + addr + ". Error message: " + e.getMessage); 0L - } - } - client.close() - offset + return for { + clients <- Future.sequence(partitionLeaders.zipWithIndex.map {tuple => + val hostAndPort = tuple._1.split(":") + val partition = tuple._2 + AsyncLowLevelConsumer(topicName, partition, hostAndPort(0), hostAndPort(1).toInt) }) + partitionsLogSize <- Future.sequence(clients.map(client => client.offset)) + closeClients <- Future.sequence(clients.map(client => client.close)) } yield partitionsLogSize } @@ -150,19 +143,24 @@ object Util { } def deleteZNode(zNode: ZNode): Future[ZNode] = { - val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d => - twitterToScalaFuture(zNode.delete(d.stat.getVersion)).recover { - case e: NotEmptyException => { - for { - children <- getZChildren(zNode, Seq("*")) - delChildren <- Future.sequence(children.map(n => deleteZNode(n))) - } yield deleteZNode(zNode) + val deletePromise: Promise[ZNode] = Promise[ZNode] + + getZChildren(zNode, Seq("*")).map({ children => + val sequenceFuture = Future.sequence(children.map(n => deleteZNode(n))) + + sequenceFuture.onSuccess({ case children => + val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d => + twitterToScalaFuture(zNode.delete(d.stat.getVersion)) } - case e: NoNodeException => Future(ZNode) - } - } - //TODO: investigate why actual type is Future[Object] - delNode.asInstanceOf[Future[ZNode]] + delNode.onComplete(zNode => deletePromise complete zNode) + }) + + sequenceFuture.onFailure({ case t => + deletePromise failure t + }) + }) + + deletePromise.future } } diff --git a/app/kafka/consumer/LowLevelConsumer.java b/app/kafka/consumer/LowLevelConsumer.java new file mode 100644 index 0000000..2be3a55 --- /dev/null +++ b/app/kafka/consumer/LowLevelConsumer.java @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2014 the original author or authors and Enernoc Inc. + * See the LICENCE.txt file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * Authors: Andrew Zafft | azafty468, Isaac Banner | ibanner56 + */ + +package kafka.consumer; + +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.HashMap; + +public class LowLevelConsumer { + static final Logger log = LoggerFactory.getLogger(LowLevelConsumer.class); + + private SimpleConsumer consumer; + private String leadBroker; + private String clientName; + private final String topic; + private final int partition; + private final int port; + + public LowLevelConsumer(String topic, int partition, String seedBroker, int port) { + this.topic = topic; + this.partition = partition; + this.leadBroker = seedBroker; + this.port = port; + clientName = "Client_" + topic + "_" + partition; + + consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName); + } + + public long startingOffset() { + long offset; + try { + offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); + } catch (Exception e) { + e.printStackTrace(); + return 0L; + } + return offset; + } + + public long endingOffset() { + long offset; + try { + offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); + } catch (Exception e) { + e.printStackTrace(); + return 0L; + } + return offset; + } + + public void closeConsumers() { + if (consumer != null) consumer.close(); + } + + private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap<>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + log.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); + return 0; + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } +} diff --git a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala new file mode 100644 index 0000000..599d2fd --- /dev/null +++ b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2014 the original author or authors and Enernoc Inc. + * See the LICENCE.txt file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * Author: Isaac Banner | ibanner56 + */ + +package kafka.consumer.async + +import kafka.consumer.LowLevelConsumer +import scala.concurrent.future +import scala.concurrent.Future +import play.api.libs.concurrent.Execution.Implicits.defaultContext + +class AsyncLowLevelConsumer(consumer: LowLevelConsumer) { + + def offset: Future[Long] = future { + consumer.endingOffset() + } + + def close = future { + consumer.closeConsumers() + } + +} + +object AsyncLowLevelConsumer { + def apply(topic: String, partition: Int, seedBroker: String, port: Int) = future { + val llc: LowLevelConsumer = new LowLevelConsumer(topic, partition, seedBroker, port) + new AsyncLowLevelConsumer(llc) + } +} \ No newline at end of file