Skip to content
This repository was archived by the owner on Dec 25, 2019. It is now read-only.

Commit

Permalink
✨ kafka ok
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Nov 1, 2017
1 parent c9a2b0c commit 5199eb6
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 2 deletions.
6 changes: 6 additions & 0 deletions SSM-WEB/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@
<artifactId>pagehelper</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.crossoverJie.kafka.official;

import com.crossoverJie.util.StringUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,7 +80,6 @@ public static void main(String[] args) {
int totalCount = 0 ;
long totalMin = 0L ;
int count = 0;
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, String> consumer = initKafkaConsumer();

long startTime = System.currentTimeMillis() ;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.crossoverJie.kafka.optimization;



import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Function:
*
* @author crossoverJie
* Date: 2017/10/24 17:40
* @since JDK 1.8
*/
public class ConsumerCallable implements Callable<ConsumerFuture> {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class);

private AtomicInteger totalCount = new AtomicInteger() ;
private AtomicLong totalTime = new AtomicLong() ;

private AtomicInteger count = new AtomicInteger() ;

/**
* 每个线程维护KafkaConsumer实例
*/
private final KafkaConsumer<String, String> consumer;

public ConsumerCallable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
//自动提交位移
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
}


/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public ConsumerFuture call() throws Exception {
boolean flag = true;
int failPollTimes = 0 ;
long startTime = System.currentTimeMillis() ;
while (flag) {
// 使用200ms作为获取超时时间
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() <= 0){
failPollTimes ++ ;

if (failPollTimes >= 20){
LOGGER.debug("达到{}次数,退出 ",failPollTimes);
flag = false ;
}

continue ;
}

//获取到之后则清零
failPollTimes = 0 ;

LOGGER.debug("本次获取:"+records.count());
count.addAndGet(records.count()) ;
totalCount.addAndGet(count.get()) ;
long endTime = System.currentTimeMillis() ;
if (count.get() >= 10000 ){
LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
totalTime.addAndGet(endTime-startTime) ;
startTime = System.currentTimeMillis() ;
count = new AtomicInteger();
}
LOGGER.debug("end totalCount={},min={}",totalCount,totalTime);

/*for (ConsumerRecord<String, String> record : records) {
// 简单地打印消息
LOGGER.debug(record.value() + " consumed " + record.partition() +
" message with offset: " + record.offset());
}*/
}

ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(),totalTime.get()) ;
return consumerFuture ;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.crossoverJie.kafka.optimization;

/**
* Function:
*
* @author crossoverJie
* Date: 2017/10/30 13:21
* @since JDK 1.8
*/
public class ConsumerFuture {
private Integer totalCount ;
private Long TotalTime ;

public ConsumerFuture(Integer totalCount, Long totalTime) {
this.totalCount = totalCount;
TotalTime = totalTime;
}

public Integer getTotalCount() {
return totalCount;
}

public void setTotalCount(Integer totalCount) {
this.totalCount = totalCount;
}

public Long getTotalTime() {
return TotalTime;
}

public void setTotalTime(Long totalTime) {
TotalTime = totalTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.crossoverJie.kafka.optimization;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* Function:
*
* @author crossoverJie
* Date: 2017/10/24 17:52
* @since JDK 1.8
*/
public class ConsumerGroup {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class);
/**
* 线程池
*/
private ExecutorService threadPool;

private List<ConsumerCallable> consumers ;

public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-pool-%d").build();

threadPool = new ThreadPoolExecutor(threadNum, threadNum,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());


consumers = new ArrayList<ConsumerCallable>(threadNum);
for (int i = 0; i < threadNum; i++) {
ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}

/**
* 执行任务
*/
public void execute() {
long startTime = System.currentTimeMillis() ;
for (ConsumerCallable runnable : consumers) {
Future<ConsumerFuture> future = threadPool.submit(runnable) ;
}
if (threadPool.isShutdown()){
long endTime = System.currentTimeMillis() ;
LOGGER.info("main thread use {} Millis" ,endTime -startTime) ;
}
threadPool.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.crossoverJie.kafka.optimization;

/**
* Function:
*
* @author crossoverJie
* Date: 2017/10/24 17:52
* @since JDK 1.8
*/
public class ConsumerThreadMain {
private static String brokerList = "localhost:9094";
private static String groupId = "group1";
private static String topic = "test";

/**
* 线程数量
*/
private static int threadNum = 3;

public static void main(String[] args) {


ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
1 change: 1 addition & 0 deletions SSM-WEB/src/main/resources/producer.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#集群地址,可以多个
bootstrap.servers=10.19.13.51:9094
acks=all
retries=0
Expand Down

0 comments on commit 5199eb6

Please sign in to comment.