From 51ff842057c213dc0c22296692f60a37f39d0c07 Mon Sep 17 00:00:00 2001
From: leeegeng <52368324+leeegeng@users.noreply.github.com>
Date: Mon, 16 Aug 2021 13:24:47 +0800
Subject: [PATCH] commit
sample for use flink
---
.idea/.gitignore | 8 +
.idea/codeStyles/codeStyleConfig.xml | 5 +
.idea/compiler.xml | 17 ++
.idea/encodings.xml | 7 +
.idea/jarRepositories.xml | 20 ++
...__ch_qos_logback_logback_classic_1_2_4.xml | 13 +
...ven__ch_qos_logback_logback_core_1_2_4.xml | 13 +
..._com_esotericsoftware_kryo_kryo_2_24_0.xml | 13 +
...com_esotericsoftware_minlog_minlog_1_2.xml | 13 +
...ackson_core_jackson_annotations_2_12_4.xml | 13 +
...erxml_jackson_core_jackson_core_2_12_4.xml | 13 +
...l_jackson_core_jackson_databind_2_12_4.xml | 13 +
..._datatype_jackson_datatype_jdk8_2_12_4.xml | 13 +
...atatype_jackson_datatype_jsr310_2_12_4.xml | 13 +
..._jackson_module_parameter_names_2_12_4.xml | 13 +
...ven__com_github_luben_zstd_jni_1_4_3_1.xml | 13 +
...ven__com_github_scopt_scopt_2_11_3_5_0.xml | 13 +
..._com_google_code_findbugs_jsr305_1_3_9.xml | 13 +
.../Maven__com_twitter_chill_2_11_0_7_6.xml | 13 +
.../Maven__com_twitter_chill_java_0_7_6.xml | 13 +
...m_typesafe_akka_akka_actor_2_11_2_5_21.xml | 13 +
...ypesafe_akka_akka_protobuf_2_11_2_5_21.xml | 13 +
...m_typesafe_akka_akka_slf4j_2_11_2_5_21.xml | 13 +
..._typesafe_akka_akka_stream_2_11_2_5_21.xml | 13 +
.../Maven__com_typesafe_config_1_3_3.xml | 13 +
...om_typesafe_ssl_config_core_2_11_0_3_7.xml | 13 +
.../Maven__commons_cli_commons_cli_1_3_1.xml | 13 +
..._collections_commons_collections_3_2_2.xml | 13 +
.../Maven__commons_io_commons_io_2_7.xml | 13 +
...nnotation_jakarta_annotation_api_1_3_5.xml | 13 +
...he_calcite_avatica_avatica_core_1_17_0.xml | 13 +
...g_apache_commons_commons_compress_1_20.xml | 13 +
...org_apache_commons_commons_lang3_3_3_2.xml | 13 +
...__org_apache_commons_commons_math3_3_5.xml | 13 +
..._apache_flink_flink_annotations_1_13_0.xml | 13 +
...apache_flink_flink_clients_2_11_1_13_0.xml | 13 +
...ache_flink_flink_connector_base_1_13_0.xml | 13 +
...che_flink_flink_connector_files_1_13_0.xml | 13 +
...link_flink_connector_kafka_2_11_1_13_0.xml | 13 +
...en__org_apache_flink_flink_core_1_13_0.xml | 13 +
...he_flink_flink_file_sink_common_1_13_0.xml | 13 +
...rg_apache_flink_flink_hadoop_fs_1_13_0.xml | 13 +
...en__org_apache_flink_flink_java_1_13_0.xml | 13 +
...apache_flink_flink_metrics_core_1_13_0.xml | 13 +
...ache_flink_flink_optimizer_2_11_1_13_0.xml | 13 +
...ink_queryable_state_client_java_1_13_0.xml | 13 +
...apache_flink_flink_runtime_2_11_1_13_0.xml | 13 +
...g_apache_flink_flink_scala_2_11_1_13_0.xml | 13 +
...ache_flink_flink_shaded_asm_7_7_1_13_0.xml | 13 +
...che_flink_flink_shaded_guava_18_0_13_0.xml | 13 +
...flink_flink_shaded_jackson_2_12_1_13_0.xml | 13 +
...k_flink_shaded_netty_4_1_49_Final_13_0.xml | 13 +
...k_flink_shaded_zookeeper_3_3_4_14_13_0.xml | 13 +
...flink_flink_streaming_java_2_11_1_13_0.xml | 13 +
...link_flink_streaming_scala_2_11_1_13_0.xml | 13 +
...ache_flink_flink_table_api_java_1_13_0.xml | 13 +
...link_table_api_java_bridge_2_11_1_13_0.xml | 13 +
...link_flink_table_api_scala_2_11_1_13_0.xml | 13 +
...ink_table_api_scala_bridge_2_11_1_13_0.xml | 13 +
...apache_flink_flink_table_common_1_13_0.xml | 13 +
..._flink_flink_table_planner_2_11_1_13_0.xml | 13 +
..._flink_table_planner_blink_2_11_1_13_0.xml | 13 +
..._flink_table_runtime_blink_2_11_1_13_0.xml | 13 +
...k_flink_walkthrough_common_2_11_1_13_0.xml | 13 +
..._org_apache_flink_force_shading_1_13_0.xml | 13 +
...__org_apache_kafka_kafka_clients_2_4_1.xml | 13 +
..._apache_logging_log4j_log4j_api_2_12_1.xml | 13 +
...apache_logging_log4j_log4j_core_2_12_1.xml | 13 +
..._logging_log4j_log4j_slf4j_impl_2_12_1.xml | 13 +
...he_logging_log4j_log4j_to_slf4j_2_14_1.xml | 13 +
..._tomcat_embed_tomcat_embed_core_9_0_50.xml | 13 +
...he_tomcat_embed_tomcat_embed_el_9_0_50.xml | 13 +
...at_embed_tomcat_embed_websocket_9_0_50.xml | 13 +
..._org_clapper_grizzled_slf4j_2_11_1_3_2.xml | 13 +
...odehaus_janino_commons_compiler_3_0_11.xml | 13 +
...ven__org_codehaus_janino_janino_3_0_11.xml | 13 +
...ven__org_javassist_javassist_3_24_0_GA.xml | 13 +
.../Maven__org_lz4_lz4_java_1_6_0.xml | 13 +
.../Maven__org_objenesis_objenesis_2_1.xml | 13 +
...aven__org_projectlombok_lombok_1_18_20.xml | 13 +
...reactivestreams_reactive_streams_1_0_2.xml | 13 +
..._modules_scala_java8_compat_2_11_0_7_0.xml | 13 +
...es_scala_parser_combinators_2_11_1_0_4.xml | 13 +
...cala_lang_modules_scala_xml_2_11_1_0_5.xml | 13 +
..._org_scala_lang_scala_compiler_2_11_12.xml | 13 +
...__org_scala_lang_scala_library_2_11_12.xml | 13 +
...__org_scala_lang_scala_reflect_2_11_12.xml | 13 +
.../Maven__org_slf4j_jul_to_slf4j_1_7_32.xml | 13 +
.../Maven__org_slf4j_slf4j_api_1_7_15.xml | 13 +
...springframework_boot_spring_boot_2_5_3.xml | 13 +
...k_boot_spring_boot_autoconfigure_2_5_3.xml | 13 +
...amework_boot_spring_boot_starter_2_5_3.xml | 13 +
...rk_boot_spring_boot_starter_json_2_5_3.xml | 13 +
...boot_spring_boot_starter_logging_2_5_3.xml | 13 +
..._boot_spring_boot_starter_tomcat_2_5_3.xml | 13 +
...ork_boot_spring_boot_starter_web_2_5_3.xml | 13 +
...__org_springframework_spring_aop_5_3_8.xml | 13 +
...org_springframework_spring_beans_5_3_8.xml | 13 +
...g_springframework_spring_context_5_3_8.xml | 13 +
..._org_springframework_spring_core_5_3_8.xml | 13 +
...pringframework_spring_expression_5_3_8.xml | 13 +
...__org_springframework_spring_jcl_5_3_8.xml | 13 +
...__org_springframework_spring_web_5_3_9.xml | 13 +
...rg_springframework_spring_webmvc_5_3_9.xml | 13 +
..._org_xerial_snappy_snappy_java_1_1_8_3.xml | 13 +
.../Maven__org_yaml_snakeyaml_1_28.xml | 13 +
.idea/misc.xml | 13 +
.idea/modules.xml | 8 +
data.txt/1 | 39 +++
data.txt/2 | 39 +++
data.txt/3 | 39 +++
data.txt/4 | 38 +++
data.txt/5 | 38 +++
data.txt/6 | 39 +++
data.txt/7 | 39 +++
data.txt/8 | 39 +++
flink-test.iml | 86 ++++++
pom.xml | 127 +++++++++
.../function/StatusStatisticsByDay.java | 159 +++++++++++
.../apitest/function/TopNFunction.java | 161 +++++++++++
.../kedacom/apitest/sink/KafkaSinkTest.java | 62 +++++
.../apitest/source/DeviceInfoReading.java | 30 ++
.../apitest/source/KafkaSourceTest.java | 55 ++++
.../kedacom/apitest/source/ReduceTest.java | 60 ++++
.../apitest/source/SelfSourceTest.java | 76 +++++
.../apitest/source/SplitStreamTest.java | 103 +++++++
.../state/KeyedStateApplicationTest.java | 88 ++++++
.../kedacom/apitest/state/KeyedStateTest.java | 64 +++++
.../apitest/state/OpetateStateTest.java | 87 ++++++
.../kedacom/apitest/tableapi/TableTest1.java | 73 +++++
.../udf/UdfAggregateFunctionTest.java | 75 +++++
.../tableapi/udf/UdfScalarFunctionTest.java | 63 +++++
.../tableapi/udf/UdfTableFunctionTest.java | 70 +++++
.../apitest/window/SocketWindowTest.java | 115 ++++++++
.../kedacom/apitest/window/WatermarkTest.java | 262 ++++++++++++++++++
.../kedacom/apitest/window/WindowTest1.java | 116 ++++++++
.../java/com/kedacom/pojo/CarNumCount.java | 14 +
.../java/com/kedacom/pojo/DeviceInfo.java | 16 ++
.../com/kedacom/pojo/DeviceStatusStt.java | 16 ++
.../java/com/kedacom/wc/SocketWordCount.java | 52 ++++
.../java/com/kedacom/wc/StreamWordCount.java | 42 +++
src/main/java/com/kedacom/wc/WordCount.java | 35 +++
src/main/resources/carnum.txt | 12 +
src/main/resources/deviceinfo.txt | 12 +
src/main/resources/hello.txt | 6 +
src/main/resources/log4j2.properties | 28 ++
target/classes/carnum.txt | 12 +
.../function/StatusStatisticsByDay$1.class | Bin 0 -> 1188 bytes
...tatusStatisticsByDay$ProcessDayStt$1.class | Bin 0 -> 1692 bytes
.../StatusStatisticsByDay$ProcessDayStt.class | Bin 0 -> 6467 bytes
.../function/StatusStatisticsByDay.class | Bin 0 -> 6996 bytes
.../apitest/function/TopNFunction$1.class | Bin 0 -> 1014 bytes
.../function/TopNFunction$CarCountAgg.class | Bin 0 -> 1815 bytes
.../function/TopNFunction$ResultWindow.class | Bin 0 -> 2540 bytes
.../TopNFunction$TopNHotCarNum$1.class | Bin 0 -> 1590 bytes
.../function/TopNFunction$TopNHotCarNum.class | Bin 0 -> 5745 bytes
.../apitest/function/TopNFunction.class | Bin 0 -> 6557 bytes
.../sink/KafkaSinkTest$DeviceSource.class | Bin 0 -> 3034 bytes
.../kedacom/apitest/sink/KafkaSinkTest.class | Bin 0 -> 3929 bytes
.../apitest/source/DeviceInfoReading.class | Bin 0 -> 2260 bytes
.../apitest/source/KafkaSourceTest.class | Bin 0 -> 4868 bytes
.../source/ReduceTest$DeviceSource.class | Bin 0 -> 3028 bytes
.../kedacom/apitest/source/ReduceTest.class | Bin 0 -> 5111 bytes
.../source/SelfSourceTest$DeviceSource.class | Bin 0 -> 3044 bytes
.../apitest/source/SelfSourceTest.class | Bin 0 -> 5043 bytes
.../apitest/source/SplitStreamTest$1.class | Bin 0 -> 2282 bytes
.../apitest/source/SplitStreamTest$2.class | Bin 0 -> 1639 bytes
.../apitest/source/SplitStreamTest$3.class | Bin 0 -> 2076 bytes
.../source/SplitStreamTest$DeviceSource.class | Bin 0 -> 3048 bytes
.../apitest/source/SplitStreamTest.class | Bin 0 -> 4215 bytes
...ateApplicationTest$StatusSttFunction.class | Bin 0 -> 3296 bytes
.../state/KeyedStateApplicationTest.class | Bin 0 -> 4796 bytes
.../state/KeyedStateTest$MyKeyedState.class | Bin 0 -> 2598 bytes
.../apitest/state/KeyedStateTest.class | Bin 0 -> 4244 bytes
.../state/OpetateStateTest$CountMapper.class | Bin 0 -> 2177 bytes
.../OpetateStateTest$MyCountMapper.class | Bin 0 -> 2451 bytes
.../apitest/state/OpetateStateTest.class | Bin 0 -> 4899 bytes
.../apitest/tableapi/TableTest1$1.class | Bin 0 -> 1144 bytes
.../kedacom/apitest/tableapi/TableTest1.class | Bin 0 -> 7883 bytes
...fAggregateFunctionTest$AverageCarNum.class | Bin 0 -> 2324 bytes
.../udf/UdfAggregateFunctionTest.class | Bin 0 -> 5857 bytes
.../udf/UdfScalarFunctionTest$HashCode.class | Bin 0 -> 759 bytes
.../tableapi/udf/UdfScalarFunctionTest.class | Bin 0 -> 5488 bytes
.../udf/UdfTableFunctionTest$SplitName.class | Bin 0 -> 1353 bytes
.../tableapi/udf/UdfTableFunctionTest.class | Bin 0 -> 5619 bytes
.../apitest/window/SocketWindowTest$1.class | Bin 0 -> 1395 bytes
.../apitest/window/SocketWindowTest$2.class | Bin 0 -> 3361 bytes
.../apitest/window/SocketWindowTest$3.class | Bin 0 -> 1162 bytes
.../apitest/window/SocketWindowTest.class | Bin 0 -> 5497 bytes
.../window/WatermarkTest$CarCountAgg.class | Bin 0 -> 1813 bytes
.../window/WatermarkTest$DeviceSource.class | Bin 0 -> 3054 bytes
.../WatermarkTest$MyTimeAssigner$1.class | Bin 0 -> 1592 bytes
.../window/WatermarkTest$MyTimeAssigner.class | Bin 0 -> 1385 bytes
...rmarkTest$MyTimestampAndWatermarks$1.class | Bin 0 -> 2414 bytes
...termarkTest$MyTimestampAndWatermarks.class | Bin 0 -> 1496 bytes
.../window/WatermarkTest$ResultWindow.class | Bin 0 -> 2538 bytes
.../WatermarkTest$TopNHotCarNum$1.class | Bin 0 -> 1585 bytes
.../window/WatermarkTest$TopNHotCarNum.class | Bin 0 -> 5856 bytes
.../apitest/window/WatermarkTest.class | Bin 0 -> 5115 bytes
.../apitest/window/WindowTest1$1.class | Bin 0 -> 1834 bytes
.../apitest/window/WindowTest1$2.class | Bin 0 -> 3630 bytes
.../window/WindowTest1$DeviceSource.class | Bin 0 -> 3042 bytes
.../kedacom/apitest/window/WindowTest1.class | Bin 0 -> 5217 bytes
.../com/kedacom/pojo/CarNumCount.class | Bin 0 -> 2436 bytes
.../classes/com/kedacom/pojo/DeviceInfo.class | Bin 0 -> 2413 bytes
.../com/kedacom/pojo/DeviceStatusStt.class | Bin 0 -> 2819 bytes
.../wc/SocketWordCount$MyFlatMap.class | Bin 0 -> 1890 bytes
.../com/kedacom/wc/SocketWordCount.class | Bin 0 -> 3096 bytes
.../wc/StreamWordCount$MyFlatMap.class | Bin 0 -> 1890 bytes
.../com/kedacom/wc/StreamWordCount.class | Bin 0 -> 2420 bytes
.../com/kedacom/wc/WordCount$MyFlatMap.class | Bin 0 -> 1866 bytes
target/classes/com/kedacom/wc/WordCount.class | Bin 0 -> 1923 bytes
target/classes/deviceinfo.txt | 12 +
target/classes/hello.txt | 6 +
target/classes/log4j2.properties | 28 ++
target/flink-test-1.0-SNAPSHOT.jar | Bin 0 -> 99166 bytes
target/maven-archiver/pom.properties | 5 +
.../compile/default-compile/createdFiles.lst | 65 +++++
.../compile/default-compile/inputFiles.lst | 24 ++
.../default-testCompile/inputFiles.lst | 0
220 files changed, 4018 insertions(+)
create mode 100644 .idea/.gitignore
create mode 100644 .idea/codeStyles/codeStyleConfig.xml
create mode 100644 .idea/compiler.xml
create mode 100644 .idea/encodings.xml
create mode 100644 .idea/jarRepositories.xml
create mode 100644 .idea/libraries/Maven__ch_qos_logback_logback_classic_1_2_4.xml
create mode 100644 .idea/libraries/Maven__ch_qos_logback_logback_core_1_2_4.xml
create mode 100644 .idea/libraries/Maven__com_esotericsoftware_kryo_kryo_2_24_0.xml
create mode 100644 .idea/libraries/Maven__com_esotericsoftware_minlog_minlog_1_2.xml
create mode 100644 .idea/libraries/Maven__com_fasterxml_jackson_core_jackson_annotations_2_12_4.xml
create mode 100644 .idea/libraries/Maven__com_fasterxml_jackson_core_jackson_core_2_12_4.xml
create mode 100644 .idea/libraries/Maven__com_fasterxml_jackson_core_jackson_databind_2_12_4.xml
create mode 100644 .idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jdk8_2_12_4.xml
create mode 100644 .idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jsr310_2_12_4.xml
create mode 100644 .idea/libraries/Maven__com_fasterxml_jackson_module_jackson_module_parameter_names_2_12_4.xml
create mode 100644 .idea/libraries/Maven__com_github_luben_zstd_jni_1_4_3_1.xml
create mode 100644 .idea/libraries/Maven__com_github_scopt_scopt_2_11_3_5_0.xml
create mode 100644 .idea/libraries/Maven__com_google_code_findbugs_jsr305_1_3_9.xml
create mode 100644 .idea/libraries/Maven__com_twitter_chill_2_11_0_7_6.xml
create mode 100644 .idea/libraries/Maven__com_twitter_chill_java_0_7_6.xml
create mode 100644 .idea/libraries/Maven__com_typesafe_akka_akka_actor_2_11_2_5_21.xml
create mode 100644 .idea/libraries/Maven__com_typesafe_akka_akka_protobuf_2_11_2_5_21.xml
create mode 100644 .idea/libraries/Maven__com_typesafe_akka_akka_slf4j_2_11_2_5_21.xml
create mode 100644 .idea/libraries/Maven__com_typesafe_akka_akka_stream_2_11_2_5_21.xml
create mode 100644 .idea/libraries/Maven__com_typesafe_config_1_3_3.xml
create mode 100644 .idea/libraries/Maven__com_typesafe_ssl_config_core_2_11_0_3_7.xml
create mode 100644 .idea/libraries/Maven__commons_cli_commons_cli_1_3_1.xml
create mode 100644 .idea/libraries/Maven__commons_collections_commons_collections_3_2_2.xml
create mode 100644 .idea/libraries/Maven__commons_io_commons_io_2_7.xml
create mode 100644 .idea/libraries/Maven__jakarta_annotation_jakarta_annotation_api_1_3_5.xml
create mode 100644 .idea/libraries/Maven__org_apache_calcite_avatica_avatica_core_1_17_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_commons_commons_compress_1_20.xml
create mode 100644 .idea/libraries/Maven__org_apache_commons_commons_lang3_3_3_2.xml
create mode 100644 .idea/libraries/Maven__org_apache_commons_commons_math3_3_5.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_annotations_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_clients_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_connector_base_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_connector_files_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_connector_kafka_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_core_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_file_sink_common_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_hadoop_fs_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_java_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_metrics_core_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_optimizer_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_queryable_state_client_java_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_runtime_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_scala_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_shaded_asm_7_7_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_shaded_guava_18_0_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_shaded_jackson_2_12_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_shaded_netty_4_1_49_Final_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_shaded_zookeeper_3_3_4_14_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_streaming_java_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_streaming_scala_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_api_java_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_api_java_bridge_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_api_scala_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_api_scala_bridge_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_common_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_planner_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_planner_blink_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_table_runtime_blink_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_flink_walkthrough_common_2_11_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_flink_force_shading_1_13_0.xml
create mode 100644 .idea/libraries/Maven__org_apache_kafka_kafka_clients_2_4_1.xml
create mode 100644 .idea/libraries/Maven__org_apache_logging_log4j_log4j_api_2_12_1.xml
create mode 100644 .idea/libraries/Maven__org_apache_logging_log4j_log4j_core_2_12_1.xml
create mode 100644 .idea/libraries/Maven__org_apache_logging_log4j_log4j_slf4j_impl_2_12_1.xml
create mode 100644 .idea/libraries/Maven__org_apache_logging_log4j_log4j_to_slf4j_2_14_1.xml
create mode 100644 .idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_core_9_0_50.xml
create mode 100644 .idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_el_9_0_50.xml
create mode 100644 .idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_websocket_9_0_50.xml
create mode 100644 .idea/libraries/Maven__org_clapper_grizzled_slf4j_2_11_1_3_2.xml
create mode 100644 .idea/libraries/Maven__org_codehaus_janino_commons_compiler_3_0_11.xml
create mode 100644 .idea/libraries/Maven__org_codehaus_janino_janino_3_0_11.xml
create mode 100644 .idea/libraries/Maven__org_javassist_javassist_3_24_0_GA.xml
create mode 100644 .idea/libraries/Maven__org_lz4_lz4_java_1_6_0.xml
create mode 100644 .idea/libraries/Maven__org_objenesis_objenesis_2_1.xml
create mode 100644 .idea/libraries/Maven__org_projectlombok_lombok_1_18_20.xml
create mode 100644 .idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_2.xml
create mode 100644 .idea/libraries/Maven__org_scala_lang_modules_scala_java8_compat_2_11_0_7_0.xml
create mode 100644 .idea/libraries/Maven__org_scala_lang_modules_scala_parser_combinators_2_11_1_0_4.xml
create mode 100644 .idea/libraries/Maven__org_scala_lang_modules_scala_xml_2_11_1_0_5.xml
create mode 100644 .idea/libraries/Maven__org_scala_lang_scala_compiler_2_11_12.xml
create mode 100644 .idea/libraries/Maven__org_scala_lang_scala_library_2_11_12.xml
create mode 100644 .idea/libraries/Maven__org_scala_lang_scala_reflect_2_11_12.xml
create mode 100644 .idea/libraries/Maven__org_slf4j_jul_to_slf4j_1_7_32.xml
create mode 100644 .idea/libraries/Maven__org_slf4j_slf4j_api_1_7_15.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_autoconfigure_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_starter_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_starter_json_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_starter_logging_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_starter_tomcat_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_boot_spring_boot_starter_web_2_5_3.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_aop_5_3_8.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_beans_5_3_8.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_context_5_3_8.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_core_5_3_8.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_expression_5_3_8.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_jcl_5_3_8.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_web_5_3_9.xml
create mode 100644 .idea/libraries/Maven__org_springframework_spring_webmvc_5_3_9.xml
create mode 100644 .idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_8_3.xml
create mode 100644 .idea/libraries/Maven__org_yaml_snakeyaml_1_28.xml
create mode 100644 .idea/misc.xml
create mode 100644 .idea/modules.xml
create mode 100644 data.txt/1
create mode 100644 data.txt/2
create mode 100644 data.txt/3
create mode 100644 data.txt/4
create mode 100644 data.txt/5
create mode 100644 data.txt/6
create mode 100644 data.txt/7
create mode 100644 data.txt/8
create mode 100644 flink-test.iml
create mode 100644 pom.xml
create mode 100644 src/main/java/com/kedacom/apitest/function/StatusStatisticsByDay.java
create mode 100644 src/main/java/com/kedacom/apitest/function/TopNFunction.java
create mode 100644 src/main/java/com/kedacom/apitest/sink/KafkaSinkTest.java
create mode 100644 src/main/java/com/kedacom/apitest/source/DeviceInfoReading.java
create mode 100644 src/main/java/com/kedacom/apitest/source/KafkaSourceTest.java
create mode 100644 src/main/java/com/kedacom/apitest/source/ReduceTest.java
create mode 100644 src/main/java/com/kedacom/apitest/source/SelfSourceTest.java
create mode 100644 src/main/java/com/kedacom/apitest/source/SplitStreamTest.java
create mode 100644 src/main/java/com/kedacom/apitest/state/KeyedStateApplicationTest.java
create mode 100644 src/main/java/com/kedacom/apitest/state/KeyedStateTest.java
create mode 100644 src/main/java/com/kedacom/apitest/state/OpetateStateTest.java
create mode 100644 src/main/java/com/kedacom/apitest/tableapi/TableTest1.java
create mode 100644 src/main/java/com/kedacom/apitest/tableapi/udf/UdfAggregateFunctionTest.java
create mode 100644 src/main/java/com/kedacom/apitest/tableapi/udf/UdfScalarFunctionTest.java
create mode 100644 src/main/java/com/kedacom/apitest/tableapi/udf/UdfTableFunctionTest.java
create mode 100644 src/main/java/com/kedacom/apitest/window/SocketWindowTest.java
create mode 100644 src/main/java/com/kedacom/apitest/window/WatermarkTest.java
create mode 100644 src/main/java/com/kedacom/apitest/window/WindowTest1.java
create mode 100644 src/main/java/com/kedacom/pojo/CarNumCount.java
create mode 100644 src/main/java/com/kedacom/pojo/DeviceInfo.java
create mode 100644 src/main/java/com/kedacom/pojo/DeviceStatusStt.java
create mode 100644 src/main/java/com/kedacom/wc/SocketWordCount.java
create mode 100644 src/main/java/com/kedacom/wc/StreamWordCount.java
create mode 100644 src/main/java/com/kedacom/wc/WordCount.java
create mode 100644 src/main/resources/carnum.txt
create mode 100644 src/main/resources/deviceinfo.txt
create mode 100644 src/main/resources/hello.txt
create mode 100644 src/main/resources/log4j2.properties
create mode 100644 target/classes/carnum.txt
create mode 100644 target/classes/com/kedacom/apitest/function/StatusStatisticsByDay$1.class
create mode 100644 target/classes/com/kedacom/apitest/function/StatusStatisticsByDay$ProcessDayStt$1.class
create mode 100644 target/classes/com/kedacom/apitest/function/StatusStatisticsByDay$ProcessDayStt.class
create mode 100644 target/classes/com/kedacom/apitest/function/StatusStatisticsByDay.class
create mode 100644 target/classes/com/kedacom/apitest/function/TopNFunction$1.class
create mode 100644 target/classes/com/kedacom/apitest/function/TopNFunction$CarCountAgg.class
create mode 100644 target/classes/com/kedacom/apitest/function/TopNFunction$ResultWindow.class
create mode 100644 target/classes/com/kedacom/apitest/function/TopNFunction$TopNHotCarNum$1.class
create mode 100644 target/classes/com/kedacom/apitest/function/TopNFunction$TopNHotCarNum.class
create mode 100644 target/classes/com/kedacom/apitest/function/TopNFunction.class
create mode 100644 target/classes/com/kedacom/apitest/sink/KafkaSinkTest$DeviceSource.class
create mode 100644 target/classes/com/kedacom/apitest/sink/KafkaSinkTest.class
create mode 100644 target/classes/com/kedacom/apitest/source/DeviceInfoReading.class
create mode 100644 target/classes/com/kedacom/apitest/source/KafkaSourceTest.class
create mode 100644 target/classes/com/kedacom/apitest/source/ReduceTest$DeviceSource.class
create mode 100644 target/classes/com/kedacom/apitest/source/ReduceTest.class
create mode 100644 target/classes/com/kedacom/apitest/source/SelfSourceTest$DeviceSource.class
create mode 100644 target/classes/com/kedacom/apitest/source/SelfSourceTest.class
create mode 100644 target/classes/com/kedacom/apitest/source/SplitStreamTest$1.class
create mode 100644 target/classes/com/kedacom/apitest/source/SplitStreamTest$2.class
create mode 100644 target/classes/com/kedacom/apitest/source/SplitStreamTest$3.class
create mode 100644 target/classes/com/kedacom/apitest/source/SplitStreamTest$DeviceSource.class
create mode 100644 target/classes/com/kedacom/apitest/source/SplitStreamTest.class
create mode 100644 target/classes/com/kedacom/apitest/state/KeyedStateApplicationTest$StatusSttFunction.class
create mode 100644 target/classes/com/kedacom/apitest/state/KeyedStateApplicationTest.class
create mode 100644 target/classes/com/kedacom/apitest/state/KeyedStateTest$MyKeyedState.class
create mode 100644 target/classes/com/kedacom/apitest/state/KeyedStateTest.class
create mode 100644 target/classes/com/kedacom/apitest/state/OpetateStateTest$CountMapper.class
create mode 100644 target/classes/com/kedacom/apitest/state/OpetateStateTest$MyCountMapper.class
create mode 100644 target/classes/com/kedacom/apitest/state/OpetateStateTest.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/TableTest1$1.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/TableTest1.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/udf/UdfAggregateFunctionTest$AverageCarNum.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/udf/UdfAggregateFunctionTest.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/udf/UdfScalarFunctionTest$HashCode.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/udf/UdfScalarFunctionTest.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/udf/UdfTableFunctionTest$SplitName.class
create mode 100644 target/classes/com/kedacom/apitest/tableapi/udf/UdfTableFunctionTest.class
create mode 100644 target/classes/com/kedacom/apitest/window/SocketWindowTest$1.class
create mode 100644 target/classes/com/kedacom/apitest/window/SocketWindowTest$2.class
create mode 100644 target/classes/com/kedacom/apitest/window/SocketWindowTest$3.class
create mode 100644 target/classes/com/kedacom/apitest/window/SocketWindowTest.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$CarCountAgg.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$DeviceSource.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$MyTimeAssigner$1.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$MyTimeAssigner.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$MyTimestampAndWatermarks$1.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$MyTimestampAndWatermarks.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$ResultWindow.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$TopNHotCarNum$1.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest$TopNHotCarNum.class
create mode 100644 target/classes/com/kedacom/apitest/window/WatermarkTest.class
create mode 100644 target/classes/com/kedacom/apitest/window/WindowTest1$1.class
create mode 100644 target/classes/com/kedacom/apitest/window/WindowTest1$2.class
create mode 100644 target/classes/com/kedacom/apitest/window/WindowTest1$DeviceSource.class
create mode 100644 target/classes/com/kedacom/apitest/window/WindowTest1.class
create mode 100644 target/classes/com/kedacom/pojo/CarNumCount.class
create mode 100644 target/classes/com/kedacom/pojo/DeviceInfo.class
create mode 100644 target/classes/com/kedacom/pojo/DeviceStatusStt.class
create mode 100644 target/classes/com/kedacom/wc/SocketWordCount$MyFlatMap.class
create mode 100644 target/classes/com/kedacom/wc/SocketWordCount.class
create mode 100644 target/classes/com/kedacom/wc/StreamWordCount$MyFlatMap.class
create mode 100644 target/classes/com/kedacom/wc/StreamWordCount.class
create mode 100644 target/classes/com/kedacom/wc/WordCount$MyFlatMap.class
create mode 100644 target/classes/com/kedacom/wc/WordCount.class
create mode 100644 target/classes/deviceinfo.txt
create mode 100644 target/classes/hello.txt
create mode 100644 target/classes/log4j2.properties
create mode 100644 target/flink-test-1.0-SNAPSHOT.jar
create mode 100644 target/maven-archiver/pom.properties
create mode 100644 target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
create mode 100644 target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst
create mode 100644 target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..633ef67
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Datasource local storage ignored files
+/../../../../:\practice\flink-test\.idea/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/
diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
new file mode 100644
index 0000000..a55e7a1
--- /dev/null
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/compiler.xml b/.idea/compiler.xml
new file mode 100644
index 0000000..6b00516
--- /dev/null
+++ b/.idea/compiler.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/encodings.xml b/.idea/encodings.xml
new file mode 100644
index 0000000..aa00ffa
--- /dev/null
+++ b/.idea/encodings.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml
new file mode 100644
index 0000000..980588c
--- /dev/null
+++ b/.idea/jarRepositories.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_2_4.xml b/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_2_4.xml
new file mode 100644
index 0000000..4c5df29
--- /dev/null
+++ b/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_2_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__ch_qos_logback_logback_core_1_2_4.xml b/.idea/libraries/Maven__ch_qos_logback_logback_core_1_2_4.xml
new file mode 100644
index 0000000..1e5c99e
--- /dev/null
+++ b/.idea/libraries/Maven__ch_qos_logback_logback_core_1_2_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_esotericsoftware_kryo_kryo_2_24_0.xml b/.idea/libraries/Maven__com_esotericsoftware_kryo_kryo_2_24_0.xml
new file mode 100644
index 0000000..28097c4
--- /dev/null
+++ b/.idea/libraries/Maven__com_esotericsoftware_kryo_kryo_2_24_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_esotericsoftware_minlog_minlog_1_2.xml b/.idea/libraries/Maven__com_esotericsoftware_minlog_minlog_1_2.xml
new file mode 100644
index 0000000..f926eec
--- /dev/null
+++ b/.idea/libraries/Maven__com_esotericsoftware_minlog_minlog_1_2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_annotations_2_12_4.xml b/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_annotations_2_12_4.xml
new file mode 100644
index 0000000..5351aad
--- /dev/null
+++ b/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_annotations_2_12_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_core_2_12_4.xml b/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_core_2_12_4.xml
new file mode 100644
index 0000000..b61928e
--- /dev/null
+++ b/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_core_2_12_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_databind_2_12_4.xml b/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_databind_2_12_4.xml
new file mode 100644
index 0000000..3088e4d
--- /dev/null
+++ b/.idea/libraries/Maven__com_fasterxml_jackson_core_jackson_databind_2_12_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jdk8_2_12_4.xml b/.idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jdk8_2_12_4.xml
new file mode 100644
index 0000000..03661ab
--- /dev/null
+++ b/.idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jdk8_2_12_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jsr310_2_12_4.xml b/.idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jsr310_2_12_4.xml
new file mode 100644
index 0000000..518a6c9
--- /dev/null
+++ b/.idea/libraries/Maven__com_fasterxml_jackson_datatype_jackson_datatype_jsr310_2_12_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_fasterxml_jackson_module_jackson_module_parameter_names_2_12_4.xml b/.idea/libraries/Maven__com_fasterxml_jackson_module_jackson_module_parameter_names_2_12_4.xml
new file mode 100644
index 0000000..9c5251c
--- /dev/null
+++ b/.idea/libraries/Maven__com_fasterxml_jackson_module_jackson_module_parameter_names_2_12_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_github_luben_zstd_jni_1_4_3_1.xml b/.idea/libraries/Maven__com_github_luben_zstd_jni_1_4_3_1.xml
new file mode 100644
index 0000000..dfa7e53
--- /dev/null
+++ b/.idea/libraries/Maven__com_github_luben_zstd_jni_1_4_3_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_github_scopt_scopt_2_11_3_5_0.xml b/.idea/libraries/Maven__com_github_scopt_scopt_2_11_3_5_0.xml
new file mode 100644
index 0000000..e52bec4
--- /dev/null
+++ b/.idea/libraries/Maven__com_github_scopt_scopt_2_11_3_5_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_google_code_findbugs_jsr305_1_3_9.xml b/.idea/libraries/Maven__com_google_code_findbugs_jsr305_1_3_9.xml
new file mode 100644
index 0000000..0e66824
--- /dev/null
+++ b/.idea/libraries/Maven__com_google_code_findbugs_jsr305_1_3_9.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_twitter_chill_2_11_0_7_6.xml b/.idea/libraries/Maven__com_twitter_chill_2_11_0_7_6.xml
new file mode 100644
index 0000000..ea68d3c
--- /dev/null
+++ b/.idea/libraries/Maven__com_twitter_chill_2_11_0_7_6.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_twitter_chill_java_0_7_6.xml b/.idea/libraries/Maven__com_twitter_chill_java_0_7_6.xml
new file mode 100644
index 0000000..c08e25c
--- /dev/null
+++ b/.idea/libraries/Maven__com_twitter_chill_java_0_7_6.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_typesafe_akka_akka_actor_2_11_2_5_21.xml b/.idea/libraries/Maven__com_typesafe_akka_akka_actor_2_11_2_5_21.xml
new file mode 100644
index 0000000..215c2c0
--- /dev/null
+++ b/.idea/libraries/Maven__com_typesafe_akka_akka_actor_2_11_2_5_21.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_typesafe_akka_akka_protobuf_2_11_2_5_21.xml b/.idea/libraries/Maven__com_typesafe_akka_akka_protobuf_2_11_2_5_21.xml
new file mode 100644
index 0000000..96dbc63
--- /dev/null
+++ b/.idea/libraries/Maven__com_typesafe_akka_akka_protobuf_2_11_2_5_21.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_typesafe_akka_akka_slf4j_2_11_2_5_21.xml b/.idea/libraries/Maven__com_typesafe_akka_akka_slf4j_2_11_2_5_21.xml
new file mode 100644
index 0000000..6153dc3
--- /dev/null
+++ b/.idea/libraries/Maven__com_typesafe_akka_akka_slf4j_2_11_2_5_21.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_typesafe_akka_akka_stream_2_11_2_5_21.xml b/.idea/libraries/Maven__com_typesafe_akka_akka_stream_2_11_2_5_21.xml
new file mode 100644
index 0000000..dad652b
--- /dev/null
+++ b/.idea/libraries/Maven__com_typesafe_akka_akka_stream_2_11_2_5_21.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_typesafe_config_1_3_3.xml b/.idea/libraries/Maven__com_typesafe_config_1_3_3.xml
new file mode 100644
index 0000000..8db5e62
--- /dev/null
+++ b/.idea/libraries/Maven__com_typesafe_config_1_3_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__com_typesafe_ssl_config_core_2_11_0_3_7.xml b/.idea/libraries/Maven__com_typesafe_ssl_config_core_2_11_0_3_7.xml
new file mode 100644
index 0000000..0f9d94e
--- /dev/null
+++ b/.idea/libraries/Maven__com_typesafe_ssl_config_core_2_11_0_3_7.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__commons_cli_commons_cli_1_3_1.xml b/.idea/libraries/Maven__commons_cli_commons_cli_1_3_1.xml
new file mode 100644
index 0000000..a1510b9
--- /dev/null
+++ b/.idea/libraries/Maven__commons_cli_commons_cli_1_3_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__commons_collections_commons_collections_3_2_2.xml b/.idea/libraries/Maven__commons_collections_commons_collections_3_2_2.xml
new file mode 100644
index 0000000..13afda2
--- /dev/null
+++ b/.idea/libraries/Maven__commons_collections_commons_collections_3_2_2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__commons_io_commons_io_2_7.xml b/.idea/libraries/Maven__commons_io_commons_io_2_7.xml
new file mode 100644
index 0000000..ded72c9
--- /dev/null
+++ b/.idea/libraries/Maven__commons_io_commons_io_2_7.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__jakarta_annotation_jakarta_annotation_api_1_3_5.xml b/.idea/libraries/Maven__jakarta_annotation_jakarta_annotation_api_1_3_5.xml
new file mode 100644
index 0000000..cba9dd2
--- /dev/null
+++ b/.idea/libraries/Maven__jakarta_annotation_jakarta_annotation_api_1_3_5.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_calcite_avatica_avatica_core_1_17_0.xml b/.idea/libraries/Maven__org_apache_calcite_avatica_avatica_core_1_17_0.xml
new file mode 100644
index 0000000..409cae9
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_calcite_avatica_avatica_core_1_17_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_commons_commons_compress_1_20.xml b/.idea/libraries/Maven__org_apache_commons_commons_compress_1_20.xml
new file mode 100644
index 0000000..b762f0e
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_commons_commons_compress_1_20.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_commons_commons_lang3_3_3_2.xml b/.idea/libraries/Maven__org_apache_commons_commons_lang3_3_3_2.xml
new file mode 100644
index 0000000..83cba3e
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_commons_commons_lang3_3_3_2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_commons_commons_math3_3_5.xml b/.idea/libraries/Maven__org_apache_commons_commons_math3_3_5.xml
new file mode 100644
index 0000000..0575123
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_commons_commons_math3_3_5.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_annotations_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_annotations_1_13_0.xml
new file mode 100644
index 0000000..ebf65f5
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_annotations_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_clients_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_clients_2_11_1_13_0.xml
new file mode 100644
index 0000000..232be22
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_clients_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_connector_base_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_connector_base_1_13_0.xml
new file mode 100644
index 0000000..e2e37b0
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_connector_base_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_connector_files_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_connector_files_1_13_0.xml
new file mode 100644
index 0000000..dcc4ba8
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_connector_files_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_connector_kafka_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_connector_kafka_2_11_1_13_0.xml
new file mode 100644
index 0000000..5e1db12
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_connector_kafka_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_core_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_core_1_13_0.xml
new file mode 100644
index 0000000..b95e64c
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_core_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_file_sink_common_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_file_sink_common_1_13_0.xml
new file mode 100644
index 0000000..c1c9f1c
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_file_sink_common_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_hadoop_fs_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_hadoop_fs_1_13_0.xml
new file mode 100644
index 0000000..040c9ef
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_hadoop_fs_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_java_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_java_1_13_0.xml
new file mode 100644
index 0000000..362f8bc
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_java_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_metrics_core_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_metrics_core_1_13_0.xml
new file mode 100644
index 0000000..698be44
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_metrics_core_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_optimizer_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_optimizer_2_11_1_13_0.xml
new file mode 100644
index 0000000..599b3eb
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_optimizer_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_queryable_state_client_java_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_queryable_state_client_java_1_13_0.xml
new file mode 100644
index 0000000..6c7078a
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_queryable_state_client_java_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_runtime_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_runtime_2_11_1_13_0.xml
new file mode 100644
index 0000000..cce9abf
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_runtime_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_scala_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_scala_2_11_1_13_0.xml
new file mode 100644
index 0000000..66d771b
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_scala_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_shaded_asm_7_7_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_shaded_asm_7_7_1_13_0.xml
new file mode 100644
index 0000000..8103dcf
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_shaded_asm_7_7_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_shaded_guava_18_0_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_shaded_guava_18_0_13_0.xml
new file mode 100644
index 0000000..1f8766a
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_shaded_guava_18_0_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_shaded_jackson_2_12_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_shaded_jackson_2_12_1_13_0.xml
new file mode 100644
index 0000000..bf67e2e
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_shaded_jackson_2_12_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_shaded_netty_4_1_49_Final_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_shaded_netty_4_1_49_Final_13_0.xml
new file mode 100644
index 0000000..a11d2b0
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_shaded_netty_4_1_49_Final_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_shaded_zookeeper_3_3_4_14_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_shaded_zookeeper_3_3_4_14_13_0.xml
new file mode 100644
index 0000000..f944e1a
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_shaded_zookeeper_3_3_4_14_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_streaming_java_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_streaming_java_2_11_1_13_0.xml
new file mode 100644
index 0000000..4247a00
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_streaming_java_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_streaming_scala_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_streaming_scala_2_11_1_13_0.xml
new file mode 100644
index 0000000..c2cd9ef
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_streaming_scala_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_api_java_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_api_java_1_13_0.xml
new file mode 100644
index 0000000..dfd0ea5
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_api_java_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_api_java_bridge_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_api_java_bridge_2_11_1_13_0.xml
new file mode 100644
index 0000000..137fbad
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_api_java_bridge_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_api_scala_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_api_scala_2_11_1_13_0.xml
new file mode 100644
index 0000000..0a40224
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_api_scala_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_api_scala_bridge_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_api_scala_bridge_2_11_1_13_0.xml
new file mode 100644
index 0000000..29d4108
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_api_scala_bridge_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_common_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_common_1_13_0.xml
new file mode 100644
index 0000000..f431c42
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_common_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_planner_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_planner_2_11_1_13_0.xml
new file mode 100644
index 0000000..68d9922
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_planner_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_planner_blink_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_planner_blink_2_11_1_13_0.xml
new file mode 100644
index 0000000..b0e4a2e
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_planner_blink_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_table_runtime_blink_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_table_runtime_blink_2_11_1_13_0.xml
new file mode 100644
index 0000000..163ab47
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_table_runtime_blink_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_flink_walkthrough_common_2_11_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_flink_walkthrough_common_2_11_1_13_0.xml
new file mode 100644
index 0000000..c6b9d9e
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_flink_walkthrough_common_2_11_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_flink_force_shading_1_13_0.xml b/.idea/libraries/Maven__org_apache_flink_force_shading_1_13_0.xml
new file mode 100644
index 0000000..540f0a6
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_flink_force_shading_1_13_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_kafka_kafka_clients_2_4_1.xml b/.idea/libraries/Maven__org_apache_kafka_kafka_clients_2_4_1.xml
new file mode 100644
index 0000000..8da5c0b
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_kafka_kafka_clients_2_4_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_logging_log4j_log4j_api_2_12_1.xml b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_api_2_12_1.xml
new file mode 100644
index 0000000..ad03854
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_api_2_12_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_logging_log4j_log4j_core_2_12_1.xml b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_core_2_12_1.xml
new file mode 100644
index 0000000..c1d6a13
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_core_2_12_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_logging_log4j_log4j_slf4j_impl_2_12_1.xml b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_slf4j_impl_2_12_1.xml
new file mode 100644
index 0000000..c4b9506
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_slf4j_impl_2_12_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_logging_log4j_log4j_to_slf4j_2_14_1.xml b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_to_slf4j_2_14_1.xml
new file mode 100644
index 0000000..ae5c0b5
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_logging_log4j_log4j_to_slf4j_2_14_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_core_9_0_50.xml b/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_core_9_0_50.xml
new file mode 100644
index 0000000..add4136
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_core_9_0_50.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_el_9_0_50.xml b/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_el_9_0_50.xml
new file mode 100644
index 0000000..f3aab20
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_el_9_0_50.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_websocket_9_0_50.xml b/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_websocket_9_0_50.xml
new file mode 100644
index 0000000..149da37
--- /dev/null
+++ b/.idea/libraries/Maven__org_apache_tomcat_embed_tomcat_embed_websocket_9_0_50.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_clapper_grizzled_slf4j_2_11_1_3_2.xml b/.idea/libraries/Maven__org_clapper_grizzled_slf4j_2_11_1_3_2.xml
new file mode 100644
index 0000000..3bda224
--- /dev/null
+++ b/.idea/libraries/Maven__org_clapper_grizzled_slf4j_2_11_1_3_2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_codehaus_janino_commons_compiler_3_0_11.xml b/.idea/libraries/Maven__org_codehaus_janino_commons_compiler_3_0_11.xml
new file mode 100644
index 0000000..ed3fac2
--- /dev/null
+++ b/.idea/libraries/Maven__org_codehaus_janino_commons_compiler_3_0_11.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_codehaus_janino_janino_3_0_11.xml b/.idea/libraries/Maven__org_codehaus_janino_janino_3_0_11.xml
new file mode 100644
index 0000000..d3306e3
--- /dev/null
+++ b/.idea/libraries/Maven__org_codehaus_janino_janino_3_0_11.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_javassist_javassist_3_24_0_GA.xml b/.idea/libraries/Maven__org_javassist_javassist_3_24_0_GA.xml
new file mode 100644
index 0000000..a9de551
--- /dev/null
+++ b/.idea/libraries/Maven__org_javassist_javassist_3_24_0_GA.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_lz4_lz4_java_1_6_0.xml b/.idea/libraries/Maven__org_lz4_lz4_java_1_6_0.xml
new file mode 100644
index 0000000..f73a838
--- /dev/null
+++ b/.idea/libraries/Maven__org_lz4_lz4_java_1_6_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_objenesis_objenesis_2_1.xml b/.idea/libraries/Maven__org_objenesis_objenesis_2_1.xml
new file mode 100644
index 0000000..7ab319b
--- /dev/null
+++ b/.idea/libraries/Maven__org_objenesis_objenesis_2_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_projectlombok_lombok_1_18_20.xml b/.idea/libraries/Maven__org_projectlombok_lombok_1_18_20.xml
new file mode 100644
index 0000000..138abdc
--- /dev/null
+++ b/.idea/libraries/Maven__org_projectlombok_lombok_1_18_20.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_2.xml b/.idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_2.xml
new file mode 100644
index 0000000..457c7b7
--- /dev/null
+++ b/.idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_modules_scala_java8_compat_2_11_0_7_0.xml b/.idea/libraries/Maven__org_scala_lang_modules_scala_java8_compat_2_11_0_7_0.xml
new file mode 100644
index 0000000..f4b289b
--- /dev/null
+++ b/.idea/libraries/Maven__org_scala_lang_modules_scala_java8_compat_2_11_0_7_0.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_modules_scala_parser_combinators_2_11_1_0_4.xml b/.idea/libraries/Maven__org_scala_lang_modules_scala_parser_combinators_2_11_1_0_4.xml
new file mode 100644
index 0000000..536c927
--- /dev/null
+++ b/.idea/libraries/Maven__org_scala_lang_modules_scala_parser_combinators_2_11_1_0_4.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_modules_scala_xml_2_11_1_0_5.xml b/.idea/libraries/Maven__org_scala_lang_modules_scala_xml_2_11_1_0_5.xml
new file mode 100644
index 0000000..50fb8f0
--- /dev/null
+++ b/.idea/libraries/Maven__org_scala_lang_modules_scala_xml_2_11_1_0_5.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_scala_compiler_2_11_12.xml b/.idea/libraries/Maven__org_scala_lang_scala_compiler_2_11_12.xml
new file mode 100644
index 0000000..db4ebd6
--- /dev/null
+++ b/.idea/libraries/Maven__org_scala_lang_scala_compiler_2_11_12.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_scala_library_2_11_12.xml b/.idea/libraries/Maven__org_scala_lang_scala_library_2_11_12.xml
new file mode 100644
index 0000000..6f45ebc
--- /dev/null
+++ b/.idea/libraries/Maven__org_scala_lang_scala_library_2_11_12.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_scala_reflect_2_11_12.xml b/.idea/libraries/Maven__org_scala_lang_scala_reflect_2_11_12.xml
new file mode 100644
index 0000000..e1fe4d2
--- /dev/null
+++ b/.idea/libraries/Maven__org_scala_lang_scala_reflect_2_11_12.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_slf4j_jul_to_slf4j_1_7_32.xml b/.idea/libraries/Maven__org_slf4j_jul_to_slf4j_1_7_32.xml
new file mode 100644
index 0000000..a758eac
--- /dev/null
+++ b/.idea/libraries/Maven__org_slf4j_jul_to_slf4j_1_7_32.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_15.xml b/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_15.xml
new file mode 100644
index 0000000..9468916
--- /dev/null
+++ b/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_15.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_2_5_3.xml
new file mode 100644
index 0000000..9692f2c
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_autoconfigure_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_autoconfigure_2_5_3.xml
new file mode 100644
index 0000000..2de3bc4
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_autoconfigure_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_2_5_3.xml
new file mode 100644
index 0000000..b31fb3d
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_json_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_json_2_5_3.xml
new file mode 100644
index 0000000..d70f5b2
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_json_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_logging_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_logging_2_5_3.xml
new file mode 100644
index 0000000..c26578f
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_logging_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_tomcat_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_tomcat_2_5_3.xml
new file mode 100644
index 0000000..a8e470e
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_tomcat_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_web_2_5_3.xml b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_web_2_5_3.xml
new file mode 100644
index 0000000..a5e1c5d
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_boot_spring_boot_starter_web_2_5_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_aop_5_3_8.xml b/.idea/libraries/Maven__org_springframework_spring_aop_5_3_8.xml
new file mode 100644
index 0000000..bb8ab5e
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_aop_5_3_8.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_beans_5_3_8.xml b/.idea/libraries/Maven__org_springframework_spring_beans_5_3_8.xml
new file mode 100644
index 0000000..4f174a5
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_beans_5_3_8.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_context_5_3_8.xml b/.idea/libraries/Maven__org_springframework_spring_context_5_3_8.xml
new file mode 100644
index 0000000..80365ee
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_context_5_3_8.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_core_5_3_8.xml b/.idea/libraries/Maven__org_springframework_spring_core_5_3_8.xml
new file mode 100644
index 0000000..33d5fb1
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_core_5_3_8.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_expression_5_3_8.xml b/.idea/libraries/Maven__org_springframework_spring_expression_5_3_8.xml
new file mode 100644
index 0000000..7f848e0
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_expression_5_3_8.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_jcl_5_3_8.xml b/.idea/libraries/Maven__org_springframework_spring_jcl_5_3_8.xml
new file mode 100644
index 0000000..704e936
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_jcl_5_3_8.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_web_5_3_9.xml b/.idea/libraries/Maven__org_springframework_spring_web_5_3_9.xml
new file mode 100644
index 0000000..eeb831a
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_web_5_3_9.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_springframework_spring_webmvc_5_3_9.xml b/.idea/libraries/Maven__org_springframework_spring_webmvc_5_3_9.xml
new file mode 100644
index 0000000..b4ee97b
--- /dev/null
+++ b/.idea/libraries/Maven__org_springframework_spring_webmvc_5_3_9.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_8_3.xml b/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_8_3.xml
new file mode 100644
index 0000000..e411463
--- /dev/null
+++ b/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_8_3.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_yaml_snakeyaml_1_28.xml b/.idea/libraries/Maven__org_yaml_snakeyaml_1_28.xml
new file mode 100644
index 0000000..1f853f7
--- /dev/null
+++ b/.idea/libraries/Maven__org_yaml_snakeyaml_1_28.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..e8942bd
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..c9e3681
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/data.txt/1 b/data.txt/1
new file mode 100644
index 0000000..cc98e9a
--- /dev/null
+++ b/data.txt/1
@@ -0,0 +1,39 @@
+DeviceInfo{name='device_8', status=1, catNum=71, time=1626007474}
+DeviceInfo{name='device_6', status=0, catNum=74, time=1626007475}
+DeviceInfo{name='device_9', status=1, catNum=29, time=1626007475}
+DeviceInfo{name='device_4', status=0, catNum=98, time=1626007476}
+DeviceInfo{name='device_2', status=1, catNum=13, time=1626007477}
+DeviceInfo{name='device_8', status=1, catNum=59, time=1626007478}
+DeviceInfo{name='device_6', status=1, catNum=45, time=1626007479}
+DeviceInfo{name='device_9', status=1, catNum=45, time=1626007479}
+DeviceInfo{name='device_4', status=1, catNum=61, time=1626007480}
+DeviceInfo{name='device_2', status=0, catNum=6, time=1626007481}
+DeviceInfo{name='device_8', status=1, catNum=13, time=1626007482}
+DeviceInfo{name='device_6', status=0, catNum=98, time=1626007483}
+DeviceInfo{name='device_9', status=1, catNum=31, time=1626007483}
+DeviceInfo{name='device_4', status=0, catNum=28, time=1626007484}
+DeviceInfo{name='device_2', status=1, catNum=91, time=1626007485}
+DeviceInfo{name='device_8', status=0, catNum=24, time=1626007486}
+DeviceInfo{name='device_6', status=1, catNum=33, time=1626007487}
+DeviceInfo{name='device_9', status=1, catNum=87, time=1626007487}
+DeviceInfo{name='device_4', status=0, catNum=94, time=1626007488}
+DeviceInfo{name='device_2', status=0, catNum=54, time=1626007489}
+DeviceInfo{name='device_8', status=0, catNum=44, time=1626007490}
+DeviceInfo{name='device_6', status=0, catNum=20, time=1626007491}
+DeviceInfo{name='device_9', status=0, catNum=2, time=1626007491}
+DeviceInfo{name='device_4', status=0, catNum=96, time=1626007492}
+DeviceInfo{name='device_2', status=1, catNum=39, time=1626007493}
+DeviceInfo{name='device_8', status=0, catNum=60, time=1626007494}
+DeviceInfo{name='device_6', status=1, catNum=1, time=1626007495}
+DeviceInfo{name='device_9', status=1, catNum=23, time=1626007495}
+DeviceInfo{name='device_4', status=0, catNum=66, time=1626007496}
+DeviceInfo{name='device_2', status=0, catNum=44, time=1626007497}
+DeviceInfo{name='device_8', status=1, catNum=73, time=1626007498}
+DeviceInfo{name='device_6', status=0, catNum=68, time=1626007499}
+DeviceInfo{name='device_9', status=0, catNum=42, time=1626007499}
+DeviceInfo{name='device_4', status=1, catNum=83, time=1626007500}
+DeviceInfo{name='device_2', status=1, catNum=73, time=1626007501}
+DeviceInfo{name='device_8', status=0, catNum=4, time=1626007502}
+DeviceInfo{name='device_6', status=0, catNum=54, time=1626007503}
+DeviceInfo{name='device_9', status=0, catNum=40, time=1626007503}
+DeviceInfo{name='device_4', status=1, catNum=95, time=1626007504}
diff --git a/data.txt/2 b/data.txt/2
new file mode 100644
index 0000000..520842e
--- /dev/null
+++ b/data.txt/2
@@ -0,0 +1,39 @@
+DeviceInfo{name='device_1', status=1, catNum=83, time=1626007474}
+DeviceInfo{name='device_7', status=1, catNum=73, time=1626007475}
+DeviceInfo{name='device_5', status=0, catNum=46, time=1626007476}
+DeviceInfo{name='device_10', status=1, catNum=23, time=1626007476}
+DeviceInfo{name='device_3', status=1, catNum=23, time=1626007477}
+DeviceInfo{name='device_1', status=1, catNum=67, time=1626007478}
+DeviceInfo{name='device_7', status=1, catNum=41, time=1626007479}
+DeviceInfo{name='device_5', status=1, catNum=31, time=1626007480}
+DeviceInfo{name='device_10', status=0, catNum=24, time=1626007480}
+DeviceInfo{name='device_3', status=0, catNum=8, time=1626007481}
+DeviceInfo{name='device_1', status=1, catNum=29, time=1626007482}
+DeviceInfo{name='device_7', status=0, catNum=6, time=1626007483}
+DeviceInfo{name='device_5', status=1, catNum=17, time=1626007484}
+DeviceInfo{name='device_10', status=1, catNum=17, time=1626007484}
+DeviceInfo{name='device_3', status=1, catNum=35, time=1626007485}
+DeviceInfo{name='device_1', status=1, catNum=5, time=1626007486}
+DeviceInfo{name='device_7', status=1, catNum=23, time=1626007487}
+DeviceInfo{name='device_5', status=0, catNum=66, time=1626007488}
+DeviceInfo{name='device_10', status=1, catNum=11, time=1626007488}
+DeviceInfo{name='device_3', status=1, catNum=69, time=1626007489}
+DeviceInfo{name='device_1', status=1, catNum=95, time=1626007490}
+DeviceInfo{name='device_7', status=1, catNum=79, time=1626007491}
+DeviceInfo{name='device_5', status=0, catNum=98, time=1626007492}
+DeviceInfo{name='device_10', status=0, catNum=72, time=1626007492}
+DeviceInfo{name='device_3', status=1, catNum=29, time=1626007493}
+DeviceInfo{name='device_1', status=0, catNum=22, time=1626007494}
+DeviceInfo{name='device_7', status=0, catNum=8, time=1626007495}
+DeviceInfo{name='device_5', status=0, catNum=92, time=1626007496}
+DeviceInfo{name='device_10', status=0, catNum=56, time=1626007496}
+DeviceInfo{name='device_3', status=0, catNum=22, time=1626007497}
+DeviceInfo{name='device_1', status=0, catNum=58, time=1626007498}
+DeviceInfo{name='device_7', status=1, catNum=37, time=1626007499}
+DeviceInfo{name='device_5', status=1, catNum=61, time=1626007500}
+DeviceInfo{name='device_10', status=1, catNum=25, time=1626007500}
+DeviceInfo{name='device_3', status=0, catNum=42, time=1626007501}
+DeviceInfo{name='device_1', status=0, catNum=32, time=1626007502}
+DeviceInfo{name='device_7', status=0, catNum=34, time=1626007503}
+DeviceInfo{name='device_5', status=0, catNum=88, time=1626007504}
+DeviceInfo{name='device_10', status=1, catNum=49, time=1626007504}
diff --git a/data.txt/3 b/data.txt/3
new file mode 100644
index 0000000..50077e5
--- /dev/null
+++ b/data.txt/3
@@ -0,0 +1,39 @@
+DeviceInfo{name='device_2', status=1, catNum=97, time=1626007474}
+DeviceInfo{name='device_8', status=1, catNum=69, time=1626007475}
+DeviceInfo{name='device_6', status=1, catNum=33, time=1626007476}
+DeviceInfo{name='device_9', status=1, catNum=9, time=1626007476}
+DeviceInfo{name='device_4', status=1, catNum=55, time=1626007477}
+DeviceInfo{name='device_2', status=1, catNum=3, time=1626007478}
+DeviceInfo{name='device_8', status=0, catNum=30, time=1626007479}
+DeviceInfo{name='device_6', status=0, catNum=80, time=1626007480}
+DeviceInfo{name='device_9', status=1, catNum=23, time=1626007480}
+DeviceInfo{name='device_4', status=0, catNum=28, time=1626007481}
+DeviceInfo{name='device_2', status=1, catNum=79, time=1626007482}
+DeviceInfo{name='device_8', status=0, catNum=42, time=1626007483}
+DeviceInfo{name='device_6', status=1, catNum=47, time=1626007484}
+DeviceInfo{name='device_9', status=0, catNum=42, time=1626007484}
+DeviceInfo{name='device_4', status=0, catNum=36, time=1626007485}
+DeviceInfo{name='device_2', status=1, catNum=19, time=1626007486}
+DeviceInfo{name='device_8', status=0, catNum=76, time=1626007487}
+DeviceInfo{name='device_6', status=0, catNum=60, time=1626007488}
+DeviceInfo{name='device_9', status=1, catNum=53, time=1626007488}
+DeviceInfo{name='device_4', status=0, catNum=76, time=1626007489}
+DeviceInfo{name='device_2', status=0, catNum=70, time=1626007490}
+DeviceInfo{name='device_8', status=1, catNum=67, time=1626007491}
+DeviceInfo{name='device_6', status=1, catNum=89, time=1626007492}
+DeviceInfo{name='device_9', status=0, catNum=70, time=1626007492}
+DeviceInfo{name='device_4', status=0, catNum=60, time=1626007493}
+DeviceInfo{name='device_2', status=1, catNum=11, time=1626007494}
+DeviceInfo{name='device_8', status=1, catNum=25, time=1626007495}
+DeviceInfo{name='device_6', status=1, catNum=43, time=1626007496}
+DeviceInfo{name='device_9', status=0, catNum=32, time=1626007496}
+DeviceInfo{name='device_4', status=0, catNum=8, time=1626007497}
+DeviceInfo{name='device_2', status=0, catNum=84, time=1626007498}
+DeviceInfo{name='device_8', status=0, catNum=36, time=1626007499}
+DeviceInfo{name='device_6', status=0, catNum=28, time=1626007500}
+DeviceInfo{name='device_9', status=1, catNum=79, time=1626007500}
+DeviceInfo{name='device_4', status=0, catNum=48, time=1626007501}
+DeviceInfo{name='device_2', status=1, catNum=27, time=1626007502}
+DeviceInfo{name='device_8', status=0, catNum=66, time=1626007503}
+DeviceInfo{name='device_6', status=1, catNum=23, time=1626007504}
+DeviceInfo{name='device_9', status=1, catNum=49, time=1626007504}
diff --git a/data.txt/4 b/data.txt/4
new file mode 100644
index 0000000..e238506
--- /dev/null
+++ b/data.txt/4
@@ -0,0 +1,38 @@
+DeviceInfo{name='device_3', status=0, catNum=10, time=1626007474}
+DeviceInfo{name='device_1', status=1, catNum=83, time=1626007475}
+DeviceInfo{name='device_7', status=1, catNum=37, time=1626007476}
+DeviceInfo{name='device_5', status=0, catNum=68, time=1626007477}
+DeviceInfo{name='device_10', status=0, catNum=54, time=1626007477}
+DeviceInfo{name='device_3', status=1, catNum=89, time=1626007478}
+DeviceInfo{name='device_1', status=0, catNum=16, time=1626007479}
+DeviceInfo{name='device_7', status=1, catNum=15, time=1626007480}
+DeviceInfo{name='device_5', status=1, catNum=75, time=1626007481}
+DeviceInfo{name='device_10', status=0, catNum=94, time=1626007481}
+DeviceInfo{name='device_3', status=0, catNum=34, time=1626007482}
+DeviceInfo{name='device_1', status=0, catNum=70, time=1626007483}
+DeviceInfo{name='device_7', status=1, catNum=49, time=1626007484}
+DeviceInfo{name='device_5', status=1, catNum=27, time=1626007485}
+DeviceInfo{name='device_10', status=1, catNum=89, time=1626007485}
+DeviceInfo{name='device_3', status=0, catNum=60, time=1626007486}
+DeviceInfo{name='device_1', status=1, catNum=23, time=1626007487}
+DeviceInfo{name='device_7', status=0, catNum=96, time=1626007488}
+DeviceInfo{name='device_5', status=0, catNum=36, time=1626007489}
+DeviceInfo{name='device_10', status=0, catNum=4, time=1626007489}
+DeviceInfo{name='device_3', status=1, catNum=11, time=1626007490}
+DeviceInfo{name='device_1', status=0, catNum=84, time=1626007491}
+DeviceInfo{name='device_7', status=1, catNum=17, time=1626007492}
+DeviceInfo{name='device_5', status=0, catNum=38, time=1626007493}
+DeviceInfo{name='device_10', status=0, catNum=84, time=1626007493}
+DeviceInfo{name='device_3', status=1, catNum=35, time=1626007494}
+DeviceInfo{name='device_1', status=1, catNum=39, time=1626007495}
+DeviceInfo{name='device_7', status=0, catNum=58, time=1626007496}
+DeviceInfo{name='device_5', status=0, catNum=74, time=1626007497}
+DeviceInfo{name='device_10', status=1, catNum=87, time=1626007497}
+DeviceInfo{name='device_3', status=0, catNum=68, time=1626007498}
+DeviceInfo{name='device_1', status=1, catNum=55, time=1626007499}
+DeviceInfo{name='device_7', status=1, catNum=35, time=1626007500}
+DeviceInfo{name='device_5', status=1, catNum=11, time=1626007501}
+DeviceInfo{name='device_10', status=0, catNum=34, time=1626007501}
+DeviceInfo{name='device_3', status=0, catNum=88, time=1626007502}
+DeviceInfo{name='device_1', status=0, catNum=28, time=1626007503}
+DeviceInfo{name='device_7', status=1, catNum=47, time=1626007504}
diff --git a/data.txt/5 b/data.txt/5
new file mode 100644
index 0000000..ad4242b
--- /dev/null
+++ b/data.txt/5
@@ -0,0 +1,38 @@
+DeviceInfo{name='device_4', status=1, catNum=45, time=1626007474}
+DeviceInfo{name='device_2', status=1, catNum=33, time=1626007475}
+DeviceInfo{name='device_8', status=0, catNum=14, time=1626007476}
+DeviceInfo{name='device_6', status=0, catNum=76, time=1626007477}
+DeviceInfo{name='device_9', status=0, catNum=42, time=1626007477}
+DeviceInfo{name='device_4', status=0, catNum=90, time=1626007478}
+DeviceInfo{name='device_2', status=0, catNum=76, time=1626007479}
+DeviceInfo{name='device_8', status=1, catNum=43, time=1626007480}
+DeviceInfo{name='device_6', status=1, catNum=13, time=1626007481}
+DeviceInfo{name='device_9', status=1, catNum=9, time=1626007481}
+DeviceInfo{name='device_4', status=1, catNum=93, time=1626007482}
+DeviceInfo{name='device_2', status=0, catNum=12, time=1626007483}
+DeviceInfo{name='device_8', status=0, catNum=56, time=1626007484}
+DeviceInfo{name='device_6', status=0, catNum=66, time=1626007485}
+DeviceInfo{name='device_9', status=1, catNum=11, time=1626007485}
+DeviceInfo{name='device_4', status=0, catNum=74, time=1626007486}
+DeviceInfo{name='device_2', status=0, catNum=82, time=1626007487}
+DeviceInfo{name='device_8', status=1, catNum=1, time=1626007488}
+DeviceInfo{name='device_6', status=1, catNum=81, time=1626007489}
+DeviceInfo{name='device_9', status=0, catNum=64, time=1626007489}
+DeviceInfo{name='device_4', status=0, catNum=10, time=1626007490}
+DeviceInfo{name='device_2', status=0, catNum=74, time=1626007491}
+DeviceInfo{name='device_8', status=1, catNum=99, time=1626007492}
+DeviceInfo{name='device_6', status=0, catNum=70, time=1626007493}
+DeviceInfo{name='device_9', status=1, catNum=1, time=1626007493}
+DeviceInfo{name='device_4', status=1, catNum=49, time=1626007494}
+DeviceInfo{name='device_2', status=1, catNum=53, time=1626007495}
+DeviceInfo{name='device_8', status=0, catNum=20, time=1626007496}
+DeviceInfo{name='device_6', status=1, catNum=69, time=1626007497}
+DeviceInfo{name='device_9', status=1, catNum=17, time=1626007497}
+DeviceInfo{name='device_4', status=1, catNum=99, time=1626007498}
+DeviceInfo{name='device_2', status=1, catNum=23, time=1626007499}
+DeviceInfo{name='device_8', status=1, catNum=75, time=1626007500}
+DeviceInfo{name='device_6', status=1, catNum=39, time=1626007501}
+DeviceInfo{name='device_9', status=0, catNum=30, time=1626007501}
+DeviceInfo{name='device_4', status=1, catNum=21, time=1626007502}
+DeviceInfo{name='device_2', status=0, catNum=4, time=1626007503}
+DeviceInfo{name='device_8', status=1, catNum=1, time=1626007504}
diff --git a/data.txt/6 b/data.txt/6
new file mode 100644
index 0000000..5e6f2c4
--- /dev/null
+++ b/data.txt/6
@@ -0,0 +1,39 @@
+DeviceInfo{name='device_5', status=1, catNum=91, time=1626007474}
+DeviceInfo{name='device_10', status=1, catNum=43, time=1626007474}
+DeviceInfo{name='device_3', status=1, catNum=3, time=1626007475}
+DeviceInfo{name='device_1', status=0, catNum=66, time=1626007476}
+DeviceInfo{name='device_7', status=1, catNum=97, time=1626007477}
+DeviceInfo{name='device_5', status=0, catNum=50, time=1626007478}
+DeviceInfo{name='device_10', status=0, catNum=94, time=1626007478}
+DeviceInfo{name='device_3', status=0, catNum=12, time=1626007479}
+DeviceInfo{name='device_1', status=0, catNum=38, time=1626007480}
+DeviceInfo{name='device_7', status=1, catNum=31, time=1626007481}
+DeviceInfo{name='device_5', status=0, catNum=58, time=1626007482}
+DeviceInfo{name='device_10', status=1, catNum=19, time=1626007482}
+DeviceInfo{name='device_3', status=0, catNum=94, time=1626007483}
+DeviceInfo{name='device_1', status=1, catNum=45, time=1626007484}
+DeviceInfo{name='device_7', status=0, catNum=52, time=1626007485}
+DeviceInfo{name='device_5', status=1, catNum=9, time=1626007486}
+DeviceInfo{name='device_10', status=1, catNum=29, time=1626007486}
+DeviceInfo{name='device_3', status=0, catNum=52, time=1626007487}
+DeviceInfo{name='device_1', status=0, catNum=86, time=1626007488}
+DeviceInfo{name='device_7', status=1, catNum=5, time=1626007489}
+DeviceInfo{name='device_5', status=0, catNum=10, time=1626007490}
+DeviceInfo{name='device_10', status=1, catNum=11, time=1626007490}
+DeviceInfo{name='device_3', status=1, catNum=49, time=1626007491}
+DeviceInfo{name='device_1', status=0, catNum=54, time=1626007492}
+DeviceInfo{name='device_7', status=1, catNum=3, time=1626007493}
+DeviceInfo{name='device_5', status=0, catNum=18, time=1626007494}
+DeviceInfo{name='device_10', status=0, catNum=60, time=1626007494}
+DeviceInfo{name='device_3', status=1, catNum=81, time=1626007495}
+DeviceInfo{name='device_1', status=1, catNum=57, time=1626007496}
+DeviceInfo{name='device_7', status=1, catNum=33, time=1626007497}
+DeviceInfo{name='device_5', status=0, catNum=70, time=1626007498}
+DeviceInfo{name='device_10', status=0, catNum=56, time=1626007498}
+DeviceInfo{name='device_3', status=0, catNum=16, time=1626007499}
+DeviceInfo{name='device_1', status=1, catNum=61, time=1626007500}
+DeviceInfo{name='device_7', status=1, catNum=31, time=1626007501}
+DeviceInfo{name='device_5', status=1, catNum=19, time=1626007502}
+DeviceInfo{name='device_10', status=1, catNum=51, time=1626007502}
+DeviceInfo{name='device_3', status=1, catNum=11, time=1626007503}
+DeviceInfo{name='device_1', status=0, catNum=92, time=1626007504}
diff --git a/data.txt/7 b/data.txt/7
new file mode 100644
index 0000000..2134718
--- /dev/null
+++ b/data.txt/7
@@ -0,0 +1,39 @@
+DeviceInfo{name='device_6', status=1, catNum=43, time=1626007474}
+DeviceInfo{name='device_9', status=0, catNum=44, time=1626007474}
+DeviceInfo{name='device_4', status=1, catNum=35, time=1626007475}
+DeviceInfo{name='device_2', status=1, catNum=73, time=1626007476}
+DeviceInfo{name='device_8', status=0, catNum=80, time=1626007477}
+DeviceInfo{name='device_6', status=0, catNum=96, time=1626007478}
+DeviceInfo{name='device_9', status=1, catNum=1, time=1626007478}
+DeviceInfo{name='device_4', status=0, catNum=34, time=1626007479}
+DeviceInfo{name='device_2', status=1, catNum=37, time=1626007480}
+DeviceInfo{name='device_8', status=1, catNum=85, time=1626007481}
+DeviceInfo{name='device_6', status=0, catNum=50, time=1626007482}
+DeviceInfo{name='device_9', status=1, catNum=13, time=1626007482}
+DeviceInfo{name='device_4', status=0, catNum=12, time=1626007483}
+DeviceInfo{name='device_2', status=1, catNum=31, time=1626007484}
+DeviceInfo{name='device_8', status=1, catNum=5, time=1626007485}
+DeviceInfo{name='device_6', status=1, catNum=35, time=1626007486}
+DeviceInfo{name='device_9', status=1, catNum=67, time=1626007486}
+DeviceInfo{name='device_4', status=0, catNum=20, time=1626007487}
+DeviceInfo{name='device_2', status=1, catNum=63, time=1626007488}
+DeviceInfo{name='device_8', status=0, catNum=2, time=1626007489}
+DeviceInfo{name='device_6', status=0, catNum=52, time=1626007490}
+DeviceInfo{name='device_9', status=1, catNum=11, time=1626007490}
+DeviceInfo{name='device_4', status=0, catNum=44, time=1626007491}
+DeviceInfo{name='device_2', status=0, catNum=78, time=1626007492}
+DeviceInfo{name='device_8', status=0, catNum=38, time=1626007493}
+DeviceInfo{name='device_6', status=1, catNum=21, time=1626007494}
+DeviceInfo{name='device_9', status=1, catNum=41, time=1626007494}
+DeviceInfo{name='device_4', status=1, catNum=89, time=1626007495}
+DeviceInfo{name='device_2', status=1, catNum=3, time=1626007496}
+DeviceInfo{name='device_8', status=0, catNum=38, time=1626007497}
+DeviceInfo{name='device_6', status=0, catNum=52, time=1626007498}
+DeviceInfo{name='device_9', status=0, catNum=54, time=1626007498}
+DeviceInfo{name='device_4', status=1, catNum=85, time=1626007499}
+DeviceInfo{name='device_2', status=1, catNum=39, time=1626007500}
+DeviceInfo{name='device_8', status=1, catNum=15, time=1626007501}
+DeviceInfo{name='device_6', status=1, catNum=99, time=1626007502}
+DeviceInfo{name='device_9', status=1, catNum=33, time=1626007502}
+DeviceInfo{name='device_4', status=1, catNum=87, time=1626007503}
+DeviceInfo{name='device_2', status=0, catNum=28, time=1626007504}
diff --git a/data.txt/8 b/data.txt/8
new file mode 100644
index 0000000..9d7c5e1
--- /dev/null
+++ b/data.txt/8
@@ -0,0 +1,39 @@
+DeviceInfo{name='device_7', status=0, catNum=80, time=1626007474}
+DeviceInfo{name='device_5', status=0, catNum=36, time=1626007475}
+DeviceInfo{name='device_10', status=0, catNum=66, time=1626007475}
+DeviceInfo{name='device_3', status=1, catNum=91, time=1626007476}
+DeviceInfo{name='device_1', status=0, catNum=66, time=1626007477}
+DeviceInfo{name='device_7', status=1, catNum=57, time=1626007478}
+DeviceInfo{name='device_5', status=0, catNum=96, time=1626007479}
+DeviceInfo{name='device_10', status=0, catNum=36, time=1626007479}
+DeviceInfo{name='device_3', status=0, catNum=98, time=1626007480}
+DeviceInfo{name='device_1', status=1, catNum=27, time=1626007481}
+DeviceInfo{name='device_7', status=1, catNum=19, time=1626007482}
+DeviceInfo{name='device_5', status=1, catNum=75, time=1626007483}
+DeviceInfo{name='device_10', status=1, catNum=59, time=1626007483}
+DeviceInfo{name='device_3', status=1, catNum=33, time=1626007484}
+DeviceInfo{name='device_1', status=0, catNum=90, time=1626007485}
+DeviceInfo{name='device_7', status=0, catNum=52, time=1626007486}
+DeviceInfo{name='device_5', status=1, catNum=95, time=1626007487}
+DeviceInfo{name='device_10', status=0, catNum=16, time=1626007487}
+DeviceInfo{name='device_3', status=1, catNum=99, time=1626007488}
+DeviceInfo{name='device_1', status=0, catNum=0, time=1626007489}
+DeviceInfo{name='device_7', status=1, catNum=63, time=1626007490}
+DeviceInfo{name='device_5', status=0, catNum=12, time=1626007491}
+DeviceInfo{name='device_10', status=1, catNum=51, time=1626007491}
+DeviceInfo{name='device_3', status=1, catNum=35, time=1626007492}
+DeviceInfo{name='device_1', status=1, catNum=5, time=1626007493}
+DeviceInfo{name='device_7', status=0, catNum=22, time=1626007494}
+DeviceInfo{name='device_5', status=1, catNum=17, time=1626007495}
+DeviceInfo{name='device_10', status=0, catNum=82, time=1626007495}
+DeviceInfo{name='device_3', status=1, catNum=79, time=1626007496}
+DeviceInfo{name='device_1', status=1, catNum=35, time=1626007497}
+DeviceInfo{name='device_7', status=0, catNum=0, time=1626007498}
+DeviceInfo{name='device_5', status=0, catNum=4, time=1626007499}
+DeviceInfo{name='device_10', status=1, catNum=21, time=1626007499}
+DeviceInfo{name='device_3', status=1, catNum=79, time=1626007500}
+DeviceInfo{name='device_1', status=1, catNum=81, time=1626007501}
+DeviceInfo{name='device_7', status=1, catNum=37, time=1626007502}
+DeviceInfo{name='device_5', status=0, catNum=20, time=1626007503}
+DeviceInfo{name='device_10', status=0, catNum=16, time=1626007503}
+DeviceInfo{name='device_3', status=0, catNum=80, time=1626007504}
diff --git a/flink-test.iml b/flink-test.iml
new file mode 100644
index 0000000..83a4e4e
--- /dev/null
+++ b/flink-test.iml
@@ -0,0 +1,86 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..be956f1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,127 @@
+
+
+ 4.0.0
+
+ org.kedacom
+ flink-test
+ 1.0-SNAPSHOT
+
+
+ UTF-8
+ 1.13.0
+ 1.8
+ 2.11
+ 8
+ 8
+ 2.12.1
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+
+
+ org.apache.flink
+ flink-walkthrough-common_${scala.binary.version}
+ ${flink.version}
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-table-planner-blink_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner-blink_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+ runtime
+
+
+ org.projectlombok
+ lombok
+ RELEASE
+ compile
+
+
+ org.projectlombok
+ lombok
+ RELEASE
+ compile
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.binary.version}
+ ${flink.version}
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/kedacom/apitest/function/StatusStatisticsByDay.java b/src/main/java/com/kedacom/apitest/function/StatusStatisticsByDay.java
new file mode 100644
index 0000000..ed5f243
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/function/StatusStatisticsByDay.java
@@ -0,0 +1,159 @@
+package com.kedacom.apitest.function;
+
+import com.kedacom.pojo.DeviceInfo;
+import com.kedacom.pojo.DeviceStatusStt;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.curator4.com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.net.URL;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+public class StatusStatisticsByDay {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.getConfig().setAutoWatermarkInterval(100);
+
+ // 文件转流
+ URL resource = TopNFunction.class.getResource("/deviceinfo.txt");
+ DataStreamSource inputStream = env.readTextFile(resource.getPath());
+
+ // 转换pojo
+ DataStream dataStream = inputStream.map(line -> {
+ String[] fields = line.split(",");
+ return new DeviceInfo(fields[0], new Integer(fields[1]), new Integer(fields[2]), new Long(fields[3]));
+ }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
+ @Override
+ public long extractTimestamp(DeviceInfo deviceInfo) {
+ return deviceInfo.getTime() * 1000;
+ }
+ });
+
+ // 自定义keyed state,统计不同name个数,窗口聚合结果
+ SingleOutputStreamOperator> resultStrem = dataStream
+ .keyBy(DeviceInfo::getName)
+ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
+// .window(TumblingEventTimeWindows.of(Time.seconds(10)))
+ .allowedLateness(Time.seconds(10))
+ .process(new ProcessDayStt());
+
+ resultStrem.print("daystt");
+
+ env.execute("day statistics job");
+ }
+
+ public static class ProcessDayStt extends ProcessWindowFunction, String, TimeWindow> {
+
+ private static MapState daySttMap; // 保持每一天统计数据,可用来做周、月统计。。。
+ private static ValueState dayStt;
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ daySttMap = getRuntimeContext().getMapState(new MapStateDescriptor("day-status-stt",
+ Long.class, DeviceStatusStt.class));
+
+ dayStt = getRuntimeContext().getState(new ValueStateDescriptor("day-status-latest-stt", DeviceStatusStt.class));
+ }
+
+ @Override
+ public void process(String s, Context context, Iterable iterable, Collector> collector) throws Exception {
+ DeviceStatusStt latestDayStt = null;
+ DeviceStatusStt curDayStt = new DeviceStatusStt();
+
+ boolean firstDayStt = false;
+
+ // 获取统计时间,以窗口开始时间作为统计时间
+ long beginTime = context.window().getStart();
+ long countTime = context.window().getEnd();
+
+ if(dayStt.value() == null){
+ // 第一次状态填充
+ firstDayStt = true;
+ }else{
+ // 赋值前一天日统计数据
+ latestDayStt = dayStt.value();
+ }
+
+ // 先对集合按照时间升序排序
+ ArrayList deviceInfos = Lists.newArrayList(iterable);
+
+ deviceInfos.sort(new Comparator() {
+ @Override
+ public int compare(DeviceInfo o1, DeviceInfo o2) {
+ if (o1.getTime() > o2.getTime())
+ return 1;
+ else if (o1.getTime() == o2.getTime())
+ return 0;
+ else
+ return -1;
+ }
+ });
+
+ for(int index = 0; index < deviceInfos.size(); index++){
+ if(0 == index){
+ // 第一个状态,且为第一天统计
+ if(firstDayStt){
+ curDayStt.setContinueDurationTime(0);// 持续时间设置为0
+ }else {
+ // 需要和上次状态比较,计算持续时长
+ if(latestDayStt.getStatus() == deviceInfos.get(index).getStatus()){
+ // 持续时长为当前事件时间-上次时间 + 上次持续时长
+ curDayStt.setContinueDurationTime(deviceInfos.get(index).getTime() -
+ beginTime/1000 + latestDayStt.getContinueDurationTime());
+ }else{
+ // 否则状态改变,设置时间为0
+ curDayStt.setContinueDurationTime(0);
+ }
+ }
+ curDayStt.setTotalCarNum(deviceInfos.get(index).getCarNum());// 过车总数设置为第一条数据过车数据
+ }else{
+ // 和当前日统计上一次数据比较
+ if(curDayStt.getStatus() == deviceInfos.get(index).getStatus()){
+ // 状态不变,持续时长增加
+ curDayStt.setContinueDurationTime(deviceInfos.get(index).getTime() -
+ curDayStt.getTime() + curDayStt.getContinueDurationTime());
+ }else{
+ // 状态改变,持续时长为0
+ curDayStt.setContinueDurationTime(0);
+ }
+ // 过车统计
+ curDayStt.setTotalCarNum(curDayStt.getTotalCarNum() + deviceInfos.get(index).getCarNum());
+ }
+ // 赋值其它属性
+ curDayStt.setName(deviceInfos.get(index).getName());
+ curDayStt.setTime(deviceInfos.get(index).getTime());
+ curDayStt.setStatus(deviceInfos.get(index).getStatus());
+ }
+ // 日统计,截止时间要到当天24点,需要计算最后一条记录和24点之间的状态
+ if(deviceInfos.get(deviceInfos.size()-1).getTime() < countTime/1000){
+ curDayStt.setContinueDurationTime(countTime/1000 - deviceInfos.get(deviceInfos.size()-1).getTime() + curDayStt.getContinueDurationTime());
+ }
+
+ // 输出集合
+ collector.collect(new Tuple2<>(new Timestamp(countTime).toString(), curDayStt));
+ // 更新状态数据
+ dayStt.update(curDayStt);
+ }
+ }
+
+}
diff --git a/src/main/java/com/kedacom/apitest/function/TopNFunction.java b/src/main/java/com/kedacom/apitest/function/TopNFunction.java
new file mode 100644
index 0000000..f07b018
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/function/TopNFunction.java
@@ -0,0 +1,161 @@
+package com.kedacom.apitest.function;
+
+import com.kedacom.pojo.CarNumCount;
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.net.URL;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+// topN 过车数据
+public class TopNFunction {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ // 文件转流
+ URL resource = TopNFunction.class.getResource("/deviceinfo.txt");
+ //String file = "E:\\practice\\flink-test\\src\\main\\resources\\deviceinfo.txt";
+ DataStreamSource inputStream = env.readTextFile(resource.getPath());
+
+ // 转换pojo
+ DataStream dataStream = inputStream.map(line -> {
+ String[] fields = line.split(",");
+ return new DeviceInfo(fields[0], new Integer(fields[1]), new Integer(fields[2]), new Long(fields[3]));
+ }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor(){
+
+ @Override
+ public long extractAscendingTimestamp(DeviceInfo deviceInfo) {
+ return deviceInfo.getTime() * 1000;
+ }
+ });
+
+ // 自定义keyed state,统计不同name个数,窗口聚合结果
+ SingleOutputStreamOperator windowAggStream = dataStream.keyBy(DeviceInfo::getName)
+ .timeWindow(Time.seconds(10))
+ .aggregate(new CarCountAgg(), new ResultWindow());
+
+ // 收集统一窗口所有设备过车数,输出topN
+ SingleOutputStreamOperator resultStream = windowAggStream.keyBy(CarNumCount::getTimewindowEnd)
+ .process(new TopNHotCarNum(5));
+
+ resultStream.print();
+
+ env.execute();
+ }
+
+ // 自定义聚合
+ public static class CarCountAgg implements AggregateFunction {
+
+ @Override
+ public Long createAccumulator() {
+ return 0L;
+ }
+
+ @Override
+ public Long add(DeviceInfo deviceInfo, Long aLong) {
+ return aLong + 1;
+ }
+
+ @Override
+ public Long getResult(Long aLong) {
+ return aLong;
+ }
+
+ @Override
+ public Long merge(Long aLong, Long acc1) {
+ return aLong + acc1;
+ }
+ }
+
+ // 自定义全窗口函数
+ public static class ResultWindow implements WindowFunction {
+
+ @Override
+ public void apply(String s, TimeWindow timeWindow, java.lang.Iterable iterable, Collector collector) throws Exception {
+ String name = s;
+ Long windowEnd = timeWindow.getEnd();
+ Integer count = iterable.iterator().next().intValue();
+
+ collector.collect(new CarNumCount(name, count, windowEnd));
+ }
+ }
+
+ // 自定义processfunciton
+ public static class TopNHotCarNum extends KeyedProcessFunction {
+
+ private int topN = 5;
+
+ public TopNHotCarNum(int topN) {
+ this.topN = topN;
+ }
+
+ // 保持当前窗口类所有输出的CarNumCount
+ ListState carNumCountListState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ carNumCountListState = getRuntimeContext().getListState(new ListStateDescriptor("carnum-count-list", CarNumCount.class));
+ }
+
+ @Override
+ public void processElement(CarNumCount carNumCount, Context context, Collector collector) throws Exception {
+ carNumCountListState.add(carNumCount);
+
+ context.timerService().registerEventTimeTimer(carNumCount.getTimewindowEnd() + 1);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
+ ArrayList countArrayList = Lists.newArrayList(carNumCountListState.get().iterator());
+
+ countArrayList.sort(new Comparator() {
+ @Override
+ public int compare(CarNumCount o1, CarNumCount o2) {
+ if (o1.getTotalCarNum() > o2.getTotalCarNum())
+ return -1;
+ else if (o1.getTotalCarNum() == o2.getTotalCarNum())
+ return 0;
+ else
+ return 1;
+ }
+ });
+
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("===================\n");
+ stringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");
+
+ for (int index = 0; index < Math.min(topN, countArrayList.size()); index++) {
+ CarNumCount carNumCount = countArrayList.get(index);
+ stringBuilder.append("NO ").append(index + 1).append(":\n")
+ .append("name = ").append(carNumCount.getName()).append("\n")
+ .append("totalCatNum = ").append(carNumCount.getTotalCarNum())
+ .append("\n\n");
+ }
+ stringBuilder.append("==========================\n\n");
+
+ TimeUnit.SECONDS.sleep(1);
+
+ out.collect(stringBuilder.toString());
+
+ carNumCountListState.clear();
+ }
+ }
+}
diff --git a/src/main/java/com/kedacom/apitest/sink/KafkaSinkTest.java b/src/main/java/com/kedacom/apitest/sink/KafkaSinkTest.java
new file mode 100644
index 0000000..31ec750
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/sink/KafkaSinkTest.java
@@ -0,0 +1,62 @@
+package com.kedacom.apitest.sink;
+
+import com.kedacom.apitest.source.SelfSourceTest;
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaSinkTest {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+
+ DataStream device = env.addSource(new DeviceSource());
+
+ DataStream transformStream = device.map(dev -> dev.toString());
+
+ transformStream.addSink(new FlinkKafkaProducer("172.16.64.85:9092", "deviceout", new SimpleStringSchema()));
+
+ transformStream.print();
+
+ env.execute("device job");
+ }
+
+ public static class DeviceSource implements SourceFunction {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ while (running) {
+ Random random = new Random();
+
+ Map devsMap = new HashMap<>();
+ for (int index = 0; index < 10; index++) {
+ devsMap.put("device_" + (index + 1), (int) random.nextInt(100));
+ }
+ for (String dev : devsMap.keySet()) {
+ int status = devsMap.get(dev) % 2 == 0 ? 0 : 1;
+ sourceContext.collect(new DeviceInfo(dev, status, devsMap.get(dev), Instant.now().getEpochSecond()));
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+}
diff --git a/src/main/java/com/kedacom/apitest/source/DeviceInfoReading.java b/src/main/java/com/kedacom/apitest/source/DeviceInfoReading.java
new file mode 100644
index 0000000..135c1a1
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/source/DeviceInfoReading.java
@@ -0,0 +1,30 @@
+package com.kedacom.apitest.source;
+
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+public class DeviceInfoReading {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream inputDevInfoStream = env.fromCollection(Arrays.asList(new DeviceInfo("dev1", 0, 10, 1625819869),
+ new DeviceInfo("dev2", 1, 20, 1625819869),
+ new DeviceInfo("dev3", 0, 40, 1625819869),
+ new DeviceInfo("dev4", 1, 2, 1625819869),
+ new DeviceInfo("dev5", 1, 5, 1625819869)));
+
+ DataStream integerDataStream = env.fromElements(1, 2, 10, 100, 250, 500);
+
+
+ inputDevInfoStream.print("device");
+
+ integerDataStream.print("int").setParallelism(1);
+
+ env.execute("device job");
+ }
+}
diff --git a/src/main/java/com/kedacom/apitest/source/KafkaSourceTest.java b/src/main/java/com/kedacom/apitest/source/KafkaSourceTest.java
new file mode 100644
index 0000000..5fab6f3
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/source/KafkaSourceTest.java
@@ -0,0 +1,55 @@
+package com.kedacom.apitest.source;
+
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaSourceTest {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Properties prop = new Properties();
+ prop.setProperty("bootstrap.servers","172.16.64.85:9092");//多个的话可以指定
+ prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+ prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+ // 指定kafka的消费者从哪里开始消费数据
+ // 共有三种方式,
+ // #earliest
+ // 当各分区下有已提交的offset时,从提交的offset开始消费;
+ // 无提交的offset时,从头开始消费
+ // #latest
+ // 当各分区下有已提交的offset时,从提交的offset开始消费;
+ // 无提交的offset时,消费新产生的该分区下的数据
+ // #none
+ // topic各分区都存在已提交的offset时,
+ // 从offset后开始消费;
+ // 只要有一个分区不存在已提交的offset,则抛出异常
+ prop.setProperty("auto.offset.reset","latest");
+
+ //设置checkpoint后在提交offset,即oncheckpoint模式
+ // 该值默认为true,
+ FlinkKafkaConsumer devinfoConsumer = new FlinkKafkaConsumer<>("deviceinfo", new SimpleStringSchema(), prop);
+ devinfoConsumer.setCommitOffsetsOnCheckpoints(true);
+
+ DataStream deviceStream = env.addSource(new FlinkKafkaConsumer("deviceinfo", new SimpleStringSchema(), prop));
+
+ DataStream map = deviceStream.map(dev -> {
+ String[] fields = dev.split(",");
+ return new DeviceInfo(fields[0], new Integer(fields[1]), new Integer(fields[2]), new Long(fields[3]));
+ });
+
+ deviceStream.print("devicd").setParallelism(2);
+
+ map.print("map");
+
+ env.execute("device job");
+ }
+}
diff --git a/src/main/java/com/kedacom/apitest/source/ReduceTest.java b/src/main/java/com/kedacom/apitest/source/ReduceTest.java
new file mode 100644
index 0000000..23c5034
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/source/ReduceTest.java
@@ -0,0 +1,60 @@
+package com.kedacom.apitest.source;
+
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class ReduceTest {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream device = env.addSource(new DeviceSource());
+
+ DataStream reduce = device.
+ filter(dev -> dev.getName().startsWith("device_2")).
+ keyBy(dev -> dev.getName()).
+ reduce((curStatus,newStatus) ->
+ new DeviceInfo(curStatus.getName(), newStatus.getStatus(), curStatus.getCarNum()+newStatus.getCarNum(), newStatus.getTime()));
+
+ reduce.print("reduce");
+
+ env.execute("device job");
+ }
+
+
+ public static class DeviceSource implements SourceFunction {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ while (running) {
+ Random random = new Random();
+
+ Map devsMap = new HashMap<>();
+ for (int index = 0; index < 10; index++) {
+ devsMap.put("device_" + (index + 1), (int) random.nextInt(100));
+ }
+ for (String dev : devsMap.keySet()) {
+ int status = devsMap.get(dev) % 2 == 0 ? 0 : 1;
+ sourceContext.collect(new DeviceInfo(dev, status, devsMap.get(dev), Instant.now().getEpochSecond()));
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+}
diff --git a/src/main/java/com/kedacom/apitest/source/SelfSourceTest.java b/src/main/java/com/kedacom/apitest/source/SelfSourceTest.java
new file mode 100644
index 0000000..924d6e6
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/source/SelfSourceTest.java
@@ -0,0 +1,76 @@
+package com.kedacom.apitest.source;
+
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import javax.xml.crypto.Data;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class SelfSourceTest {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+
+ DataStream device = env.addSource(new DeviceSource());
+
+ DataStream filterStream = device.
+// map( deviceInfo -> deviceInfo).
+ filter(s -> s.getName().startsWith("device_1"));
+
+ KeyedStream keyedStream = device.
+ filter(s -> s.getName().startsWith("device_1")).
+ keyBy(deviceInfo -> deviceInfo.getName());
+ DataStream max = keyedStream.max("carNum");
+ DataStream sum = keyedStream.sum("carNum");
+
+// device.print("devicd").setParallelism(1);
+ filterStream.print("filter");
+// keyedStream.print("keyby");
+ max.print("max").setParallelism(1);
+ sum.print("sum").setParallelism(1);
+
+
+ env.execute("device job");
+ }
+
+ public static class DeviceSource implements SourceFunction {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ while (running) {
+ Random random = new Random();
+
+ Map devsMap = new HashMap<>();
+ for (int index = 0; index < 10; index++) {
+ devsMap.put("device_" + (index + 1), (int) random.nextInt(100));
+ }
+ for (String dev : devsMap.keySet()) {
+ int status = devsMap.get(dev) % 2 == 0 ? 0 : 1;
+ sourceContext.collect(new DeviceInfo(dev, status, devsMap.get(dev), Instant.now().getEpochSecond()));
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+}
diff --git a/src/main/java/com/kedacom/apitest/source/SplitStreamTest.java b/src/main/java/com/kedacom/apitest/source/SplitStreamTest.java
new file mode 100644
index 0000000..d707f8f
--- /dev/null
+++ b/src/main/java/com/kedacom/apitest/source/SplitStreamTest.java
@@ -0,0 +1,103 @@
+package com.kedacom.apitest.source;
+
+import com.kedacom.pojo.DeviceInfo;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.ConnectedStreams;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class SplitStreamTest {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream device = env.addSource(new DeviceSource());
+
+ final OutputTag onlineTag = new OutputTag<>("online", TypeInformation.of(DeviceInfo.class));
+ final OutputTag offlineTag = new OutputTag<>("offline", TypeInformation.of(DeviceInfo.class));
+
+ // 1 分流,online、offline
+ SingleOutputStreamOperator process = device.process(new ProcessFunction() {
+ @Override
+ public void processElement(DeviceInfo deviceInfo, Context context, Collector collector) throws Exception {
+ if (deviceInfo.getStatus() == 1) {
+ context.output(onlineTag, deviceInfo);
+ } else {
+ context.output(offlineTag, deviceInfo);
+ }
+ }
+ });
+
+ process.getSideOutput(onlineTag).print("online");
+ process.getSideOutput(offlineTag).print("offline");
+
+
+ // 2 合流 connect,将在线流转换成元组
+ SingleOutputStreamOperator> onlineStream = process.getSideOutput(onlineTag).map(new MapFunction>() {
+ @Override
+ public Tuple3 map(DeviceInfo deviceInfo) throws Exception {
+ return new Tuple3<>(deviceInfo.getName(), deviceInfo.getCarNum(), deviceInfo.getTime());
+ }
+ });
+
+ ConnectedStreams, DeviceInfo> connect = onlineStream.connect(process.getSideOutput(offlineTag));
+ SingleOutputStreamOperator