Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
FutaoSmile committed Mar 19, 2020
1 parent 43c3056 commit 51c882c
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 34 deletions.
37 changes: 37 additions & 0 deletions learn-kit/src/main/java/com/futao/learn/kit/kit/TimeKit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.futao.learn.kit.kit;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;

/**
* @author futao
* @date 2020/3/16.
*/
public class TimeKit {


/**
* LocalDateTime 转为 Date
*
* @param localDateTime
* @return
*/
public Date parseLocalDateTime2Date(LocalDateTime localDateTime) {
Instant instant = localDateTime.atZone(ZoneOffset.ofHours(8)).toInstant();
return Date.from(instant);
}

/**
* LocalDate 转为 Date
*
* @param localDate
* @return
*/
public Date parseLocalDate2Date(LocalDate localDate) {
Instant instant = localDate.atStartOfDay(ZoneOffset.ofHours(8)).toInstant();
return Date.from(instant);
}
}
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<!-- <module>learn-socket</module>-->
<module>springboot-learn-mybatis-plus</module>
<module>springboot-learn-rabbitmq</module>
<!-- <module>learn-kit</module>-->
<!-- <module>learn-kit</module>-->
</modules>

<!--引入SpringBootParent的第二种方式-->
Expand Down Expand Up @@ -145,12 +145,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-devtools</artifactId>-->
<!-- <scope>runtime</scope>-->
<!-- <optional>true</optional>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public class UserService extends ServiceImpl<UserMapper, User> {
public void insert(User user) {
save(user);
}

public static void main(String[] args) {
System.out.println(1 / 60D + (2 / 60D));
}
}
17 changes: 17 additions & 0 deletions springboot-learn-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>


<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -29,6 +31,7 @@
* @author futao
* @date 2020/3/14.
*/
@MapperScan("com.futao.springboot.learn.rabbitmq.dao")
@Slf4j
@SpringBootApplication
public class RabbitMQApplication implements ApplicationRunner {
Expand All @@ -46,6 +49,7 @@ public static void main(String[] args) {
public void run(ApplicationArguments args) throws Exception {
//设置消费者在断开与RabbitMQ的连接之后自动重新连接
cachingConnectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
System.out.println(cachingConnectionFactory.getRabbitConnectionFactory().isAutomaticRecoveryEnabled());

//消息投递成功与否的监听,可以用来保证消息100%投递到rabbitMQ。(如果某条消息(通过id判定)在一定时间内未收到该回调,则重发该消息)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
Expand All @@ -58,5 +62,11 @@ public void run(ApplicationArguments args) throws Exception {
log.info("cause:{}", cause);
});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

package com.futao.springboot.learn.rabbitmq.config;

import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import org.apache.ibatis.reflection.MetaObject;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.LocalDateTime;

/**
* @author futao
* @date 2020/3/9.
*/
@Configuration
public class MybatisPlusConfig implements MetaObjectHandler {


/**
* 分页插件
*
* @return
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
return paginationInterceptor;
}

@Override
public void insertFill(MetaObject metaObject) {
setFieldValByName("createDateTime", LocalDateTime.now(), metaObject);
}

@Override
public void updateFill(MetaObject metaObject) {
setFieldValByName("updateDateTime", LocalDateTime.now(), metaObject);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.futao.springboot.learn.rabbitmq.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.futao.springboot.learn.rabbitmq.model.UserModel;

/**
* @author futao
* @date 2020/3/19.
*/
public interface UserMapper extends BaseMapper<UserModel> {
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.futao.springboot.learn.rabbitmq.model;

import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Tolerate;

import java.time.LocalDate;
import java.time.LocalDateTime;

/**
* @author futao
Expand All @@ -14,14 +17,28 @@
@Getter
@Setter
@Builder
public class UserModel {
@TableName("user")
public class UserModel extends Model<UserModel> {

@Tolerate
public UserModel() {
}

private int id;
@TableId(value = "id", type = IdType.UUID)
private String id;

@TableField("user_name")
private String userName;

@TableField("age")
private int age;

@TableField("birthday")
private LocalDate birthday;

@TableField(value = "create_date_time", fill = FieldFill.INSERT)
private LocalDateTime createDateTime;

@TableField(value = "update_date_time", fill = FieldFill.UPDATE)
private LocalDateTime updateDateTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

Expand All @@ -17,20 +18,26 @@
public class Definition {


@Bean
@Bean("userExchange")
public FanoutExchange userFanoutExchange() {
return new FanoutExchange("user-exchange", true, false);
}

@Bean
@Bean("userQueue")
public Queue userQueue() {
return new Queue("user-queue", true);
}

@Bean
public Binding userBind(Queue userQueue, FanoutExchange userFanoutExchange) {
@Bean("userQEBind")
public Binding userBind(
@Qualifier("userQueue")
Queue userQueue,
@Qualifier("userExchange")
FanoutExchange userFanoutExchange
) {
return BindingBuilder.bind(userQueue)
.to(userFanoutExchange);
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.futao.springboot.learn.rabbitmq.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.futao.springboot.learn.rabbitmq.dao.UserMapper;
import com.futao.springboot.learn.rabbitmq.model.UserModel;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
Expand All @@ -8,6 +11,7 @@
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;

Expand All @@ -21,11 +25,15 @@
@Component
public class Receiver {

@Resource
private UserMapper userMapper;

@RabbitHandler
@RabbitListener(queues = "user-queue")
public void userReceiver(String body, Channel channel, @Headers Map<String, Object> headers) throws IOException {
log.info("开始消费[{}]", body);
//如开启了手动ACK,则需要这样设置
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
userMapper.insert(JSON.parseObject(body, UserModel.class));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.futao.springboot.learn.rabbitmq.rabbitmq;

import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.futao.springboot.learn.rabbitmq.model.UserModel;
import com.futao.springboot.learn.rabbitmq.util.CommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand All @@ -11,6 +13,12 @@
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author futao
Expand All @@ -24,30 +32,36 @@ public class Sender implements ApplicationRunner {
@Autowired
private RabbitTemplate rabbitTemplate;


AtomicInteger atomicInteger = new AtomicInteger(1);

ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "threadNum" + atomicInteger.getAndIncrement());
}
});

public void userSender() throws InterruptedException {
for (int i = 0; i < 100; i++) {
UserModel userModel = UserModel.builder()
.userName("futao" + i)
.age(i)
.birthday(LocalDate.now())
.id(i)
.build();
rabbitTemplate.convertAndSend("user-exchange", "", JSON.toJSONString(userModel), new CorrelationData(String.valueOf(userModel.getId())));
Thread.sleep(100);
log.info("send{}", i);
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
for (int i = 0; i < 7; i++) {
UserModel userModel = UserModel.builder()
.userName(CommonUtil.getRandomJianHan(3) + i)
.age(i)
.birthday(LocalDate.now(ZoneOffset.ofHours(8)))
.id(IdUtil.simpleUUID())
.createDateTime(LocalDateTime.now())
.build();
rabbitTemplate.convertAndSend("user-exchange", "", JSON.toJSONString(userModel), new CorrelationData(String.valueOf(userModel.getId())));
log.info("send{}", i);
}
});
}
}

@Override
public void run(ApplicationArguments args) throws Exception {
// for (int i = 0; i < 10; i++) {
// new Thread(() -> {
// try {
// userSender();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// }
// userSender();
}
}
Loading

0 comments on commit 51c882c

Please sign in to comment.