Skip to content

Commit 4b462d8

Browse files
author
dapeng
committed
添加缺失的类
1 parent a923fff commit 4b462d8

File tree

2 files changed

+25
-26
lines changed

2 files changed

+25
-26
lines changed

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseDialect.java renamed to launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,27 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.sink.clickhouse;
19+
package com.dtstack.flink.sql.launcher.perjob;
2020

21-
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
21+
import org.apache.commons.io.Charsets;
2222

23-
import java.util.Optional;
23+
import java.io.UnsupportedEncodingException;
24+
import java.net.URLDecoder;
25+
import java.util.Arrays;
26+
import java.util.List;
2427

2528
/**
26-
* Date: 2020/1/15
29+
* Date: 2019/12/28
2730
* Company: www.dtstack.com
2831
* @author maqi
2932
*/
30-
public class ClickhouseDialect implements JDBCDialect {
33+
public class ConfigParseUtil {
3134

32-
@Override
33-
public boolean canHandle(String url) {
34-
return url.startsWith("jdbc:clickhouse:");
35-
}
36-
37-
@Override
38-
public Optional<String> defaultDriverName() {
39-
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
40-
}
41-
42-
@Override
43-
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
44-
throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode");
35+
public static List<String> parsePathFromStr(String pathStr) throws UnsupportedEncodingException {
36+
String addjarPath = URLDecoder.decode(pathStr, Charsets.UTF_8.toString());
37+
if (addjarPath.length() > 2) {
38+
addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"","");
39+
}
40+
return Arrays.asList(addjarPath.split(","));
4541
}
4642
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.slf4j.LoggerFactory;
3939

4040
import java.io.File;
41+
import java.io.UnsupportedEncodingException;
4142
import java.net.MalformedURLException;
4243
import java.net.URL;
4344
import java.util.ArrayList;
@@ -82,7 +83,7 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC
8283
}
8384

8485
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph)
85-
throws MalformedURLException {
86+
throws MalformedURLException, UnsupportedEncodingException {
8687

8788
String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf();
8889
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf);
@@ -106,13 +107,7 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ
106107
} else {
107108
throw new RuntimeException("The Flink jar path is null");
108109
}
109-
// add user customized file to shipfile
110-
if (!StringUtils.isBlank(launcherOptions.getAddShipfile())) {
111-
List<String> paths = ConfigParseUtil.parsePathFromStr(launcherOptions.getAddShipfile());
112-
paths.forEach(path -> {
113-
shipFiles.add(new File(path));
114-
});
115-
}
110+
116111
// classpath , all node need contain plugin jar
117112
String pluginLoadMode = launcherOptions.getPluginLoadMode();
118113
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
@@ -125,6 +120,14 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ
125120
+ " Currently only classpath and shipfile are supported.");
126121
}
127122

123+
// add user customized file to shipfile
124+
if (!StringUtils.isBlank(launcherOptions.getAddShipfile())) {
125+
List<String> paths = ConfigParseUtil.parsePathFromStr(launcherOptions.getAddShipfile());
126+
paths.forEach(path -> {
127+
shipFiles.add(new File(path));
128+
});
129+
}
130+
128131
clusterDescriptor.addShipFiles(shipFiles);
129132
clusterDescriptor.setName(launcherOptions.getName());
130133
String queue = launcherOptions.getQueue();

0 commit comments

Comments
 (0)