Skip to content

Commit

Permalink
增加描述
Browse files Browse the repository at this point in the history
增加注释
  • Loading branch information
leeegeng committed Aug 16, 2021
1 parent 51ff842 commit 390027c
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 8 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
"# flink-test"
# wc
- WordCount

从文件读取数据,批处理计算单词个数,数据结构DataSet
- StreamWordCount

从文件读取数据,转成DataStream流
- socketWrodCount

从scket读取流数据,计算单词个数

# pojo

pojo类,测试使用,flink 流能够处理pojo类和元组类

# window

开窗测试

# source

数据源测试

# sink

sink测试

# tableapi

table api 、sql及udf测试

# function

函数测试
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import java.util.ArrayList;
import java.util.Comparator;

/**
* 状态统计,每日统计一次,从0点开始
* 统计设备当前状态连续时长,以及每天设备过车数据
* 采用ValueState保持前一天过车统计
* MapState 考虑扩展使用,周统计可考虑采用缓存每天数据,每周一定时统计
*/
public class StatusStatisticsByDay {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/kedacom/apitest/function/TopNFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import java.util.Comparator;
import java.util.concurrent.TimeUnit;

// topN 过车数据
/**
* topN 过车数据排名
* 思路:1 aggregate聚合函数,采用两个参数:聚合和开窗,对窗口内数据进行统计
* 2 然后再按窗口结束时间keyby,对过车数据按窗口结束时间排序
*/
public class TopNFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/sink/KafkaSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* kafka sink 测试
*/
public class KafkaSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import java.lang.reflect.Array;
import java.util.Arrays;

/**
* 模拟构造数据输入
*/
public class DeviceInfoReading {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/source/KafkaSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import java.util.Arrays;
import java.util.Properties;

/**
* kafka数据源测试
*/
public class KafkaSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/source/ReduceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* reduce计算过车统计
*/
public class ReduceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/source/SelfSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* 自定义数据源测试
*/
public class SelfSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/source/SplitStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* 分流测试
*/
public class SplitStreamTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;

//
/**
* keyed state测试
*/
public class KeyedStateApplicationTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/state/OpetateStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.Collections;
import java.util.List;

/**
* operate state 测试
*/
public class OpetateStateTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/kedacom/apitest/tableapi/TableTest1.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
* function / table api / sql 对比测试
*/
public class TableTest1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.apache.flink.table.runtime.aggregate.AggregateAggFunction;
import org.apache.flink.types.Row;

/**
* sql udf 测试
*/
public class UdfAggregateFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

/**
* scalar udf 测试
*/

public class UdfScalarFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/kedacom/wc/SocketWordCount.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import org.apache.flink.util.Collector;

@Slf4j
// 提交参数
// com.kedacom.wc.StreamWordCount
// --host 172.16.64.85 --port 9999
/**
* 从socket读取文本流,计算单词个数
* 可从FlinkUI提交jar包,包名:com.kedacom.wc.StreamWordCount,参数:--host 172.16.64.85 --port 9999
*/

public class SocketWordCount {
public static void main(String[] args) throws Exception {
System.out.println("test");
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/kedacom/wc/StreamWordCount.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

// 提交参数
// com.kedacom.wc.StreamWordCount
// --host 172.16.64.85 --port 9999
/**
* 文件提取,转成DataStream
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
System.out.println("test");
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/kedacom/wc/WordCount.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* DataSet
* 批处理文件
*/
public class WordCount {
public static void main(String[] args) throws Exception {
System.out.println("test");
Expand Down

0 comments on commit 390027c

Please sign in to comment.