From 06ee07290e71b2eca145793a43af2536ebe54423 Mon Sep 17 00:00:00 2001 From: Eagle is pig monster <1050096872@qq.com> Date: Sun, 5 May 2019 18:46:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=88=A0=E9=99=A4=20siddhi?= =?UTF-8?q?=20=E8=A7=84=E5=88=99=E6=8A=A5=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在程序运行时,删除已有 siddhi 规则,发生报错 报错信息为: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442) at java.util.HashMap$KeyIterator.next(HashMap.java:1466) at org.apache.flink.streaming.siddhi.router.AddRouteOperator.handleMetadataControlEvent(AddRouteOperator.java:103) at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:62) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) --- .../siddhi/router/AddRouteOperator.java | 13 +++--- pom.xml | 40 ++++++++++--------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java b/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java index 5b761cd..31062b3 100644 --- a/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java +++ b/core/src/main/java/org/apache/flink/streaming/siddhi/router/AddRouteOperator.java @@ -27,12 +27,7 @@ import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema; import org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; public class AddRouteOperator extends AbstractStreamOperator> @@ -100,10 +95,12 @@ public void processElement(StreamRecord> element) th private void handleMetadataControlEvent(MetadataControlEvent event) throws Exception { if (event.getDeletedExecutionPlanId() != null) { for (String executionPlanId : event.getDeletedExecutionPlanId()) { - for (String inputStreamId : inputStreamToExecutionPlans.keySet()) { + for (Iterator>> it = inputStreamToExecutionPlans.entrySet().iterator(); it.hasNext();) { + Map.Entry> entry = it.next(); + String inputStreamId = entry.getKey(); inputStreamToExecutionPlans.get(inputStreamId).remove(executionPlanId); if (inputStreamToExecutionPlans.get(inputStreamId).isEmpty()) { - inputStreamToExecutionPlans.remove(inputStreamId); + it.remove(); } } executionPlanIdToPartitionKeys.remove(executionPlanId); diff --git a/pom.xml b/pom.xml index bb8c4da..be0a9b8 100755 --- a/pom.xml +++ b/pom.xml @@ -159,28 +159,19 @@ under the License. - org.apache.maven.plugins maven-deploy-plugin - 2.8.1 - - true - + 2.8.2 + default-deploy deploy - - jar - true - https://clojars.org/repo - clojars - ${project.artifactId} - ${project.groupId} - ${project.version} - ${project.build.directory}/${project.build.finalName}.jar - - deploy-file + deploy + + @@ -199,12 +190,25 @@ under the License. - - + + + + + nexus-releases + Nexus Release Repository + http://10.143.132.59:8081/repository/maven-releases/ + + + nexus-snapshots + Nexus Snapshot Repository + http://10.143.132.59:8081/repository/maven-snapshots/ +