From b2c9d7c2ef950dcc7b2dcb2eba35ecfa60441c75 Mon Sep 17 00:00:00 2001 From: liubin Date: Thu, 11 Jun 2020 19:56:48 +0800 Subject: [PATCH] =?UTF-8?q?local=E6=A8=A1=E5=BC=8Ftaskmanager.numberOfTask?= =?UTF-8?q?Slots=E6=8C=87=E5=AE=9A=E5=A4=B1=E6=95=88=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/flink/sql/environment/MyLocalStreamEnvironment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java index 769f9f462..863bb7796 100644 --- a/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java +++ b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java @@ -105,13 +105,13 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "512M"); - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS.key(), jobGraph.getMaximumParallelism()); // add (and override) the settings with what the user defined configuration.addAll(this.conf); MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); configBuilder.setConfiguration(configuration); + configBuilder.setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism()); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster");