diff --git a/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java index b242d6fb48..e138ade47e 100644 --- a/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java +++ b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java @@ -36,7 +36,7 @@ public class FlinkOptimizer extends Optimizer { private static final Logger LOG = LoggerFactory.getLogger(FlinkOptimizer.class); - private static final String JOB_NAME = "amoro-flink-optimizer"; + private static final String JOB_NAME_FORMAT = "amoro-flink-optimizer-%s"; public FlinkOptimizer(OptimizerConfig config) { super(config, () -> new OptimizerToucher(config), (i) -> new FlinkOptimizerExecutor(config, i)); @@ -64,7 +64,7 @@ public static void main(String[] args) throws CmdLineException { .setParallelism(1); try { - env.execute(JOB_NAME); + env.execute(String.format(JOB_NAME_FORMAT, optimizerConfig.getResourceId())); } catch (Exception e) { LOG.error("Execute flink optimizer failed", e); } diff --git a/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java index 6e21e59cb0..14ade241c3 100644 --- a/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java +++ b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java @@ -31,7 +31,7 @@ /** The {@code SparkOptimizer} acts as an entrypoint of the spark program */ public class SparkOptimizer extends Optimizer { private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizer.class); - private static final String APP_NAME = "amoro-spark-optimizer"; + private static final String APP_NAME_FORMAT = "amoro-spark-optimizer-%s"; public SparkOptimizer(OptimizerConfig config, JavaSparkContext jsc) { super( @@ -41,9 +41,12 @@ public SparkOptimizer(OptimizerConfig config, JavaSparkContext jsc) { } public static void main(String[] args) throws Exception { - SparkSession spark = SparkSession.builder().appName(APP_NAME).getOrCreate(); - JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); OptimizerConfig config = new OptimizerConfig(args); + SparkSession spark = + SparkSession.builder() + .appName(String.format(APP_NAME_FORMAT, config.getResourceId())) + .getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); if (!jsc.getConf().getBoolean("spark.dynamicAllocation.enabled", false)) { LOG.warn( "To better utilize computing resources, it is recommended to enable 'spark.dynamicAllocation.enabled' "