diff --git a/README.en-US.md b/README.en-US.md index bfd4db795..402a03ee2 100644 --- a/README.en-US.md +++ b/README.en-US.md @@ -32,16 +32,16 @@ Welcome everyone to join the Alink open source user group to communicate. About package names and versions: - PyAlink provides different Python packages for Flink versions that Alink supports: - package `pyalink` always maintains Alink Python API against the latest Flink version, which is 1.10, - while `pyalink-flink-***` support old-version Flink, which are `pyalink-flink-1.9` for now. - - The version of python packages always follows Alink Java version, like `1.1.0`. + package `pyalink` always maintains Alink Python API against the latest Flink version, which is 1.11, + while `pyalink-flink-***` support old-version Flink, which are `pyalink-flink-1.10` and `pyalink-flink-1.9` for now. + - The version of python packages always follows Alink Java version, like `1.2.0`. Installation steps: 1. Make sure the version of python3 on your computer is 3.6 or 3.7. 2. Make sure Java 8 is installed on your computer. 3. Use pip to install: - `pip install pyalink` or `pip install pyalink-flink-1.9`. + `pip install pyalink`, `pip install pyalink-flink-1.10` or `pip install pyalink-flink-1.9`. Potential issues: @@ -50,14 +50,15 @@ Potential issues: If `pyalink` or `pyalink-flink-***` was/were installed, please use `pip uninstall pyalink` or `pip uninstall pyalink-flink-***` to remove them. 2. If `pip install` is slow of failed, refer to [this article](https://segmentfault.com/a/1190000006111096) to change the pip source, or use the following download links: - - Flink 1.10:[Link](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.1.2.post0/pyalink-1.1.2.post0-py3-none-any.whl) (MD5: 6bf3a50a4437116793149ead57d9793c) - - Flink 1.9: [Link](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.1.2.post0/pyalink_flink_1.9-1.1.2.post0-py3-none-any.whl) (MD5: e6d2a0ba3549662d77b51a4a37483479) + - Flink 1.11:[Link](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.2.0/pyalink-1.2.0-py3-none-any.whl) (MD5: 8a38e8009712afcbf5ecdb297db3c8ac) + - Flink 1.10:[Link](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.2.0/pyalink_flink_1.10-1.2.0-py3-none-any.whl) (MD5: 8a38e8009712afcbf5ecdb297db3c8ac) + - Flink 1.9: [Link](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.2.0/pyalink_flink_1.9-1.2.0-py3-none-any.whl) (MD5: 2f25eec02a692661c0f0f3a89e5c2f0c) 3. If multiple version of Python exist, you may need to use a special version of `pip`, like `pip3`; If Anaconda is used, the command should be run in Anaconda prompt. ### Start using: ------- -We recommend using Jupyter Notebook to use PyAlink to provide a better experience. +You can start using PyAlink with Jupyter Notebook to provide a better experience. Steps for usage: @@ -70,7 +71,7 @@ Steps for usage: ```useLocalEnv(parallism, flinkHome=None, config=None)```. - Among them, the parameter ```parallism``` indicates the degree of parallelism used for execution;```flinkHome``` is the full path of flink,and the default flink-1.9.0 path of PyAlink is used; ```config``` is the configuration parameter accepted by Flink. After running, the following output appears, indicating that the initialization of the running environment is successful. + Among them, the parameter ```parallism``` indicates the degree of parallelism used for execution;```flinkHome``` is the full path of flink, and usually no need to set; ```config``` is the configuration parameter accepted by Flink. After running, the following output appears, indicating that the initialization of the running environment is successful. ``` JVM listening on *** Python listening on *** @@ -87,19 +88,19 @@ print(df) ### Write code: ------ -In PyAlink, the interface provided by the algorithm component is basically the same as the Java API, that is, an algorithm component is created through the default construction method, then the parameters are set through ```setXXX```, and other components are connected through ```link / linkTo / linkFrom```. +In PyAlink, the interface provided by the algorithm component is basically the same as the Java APIs, that is, an algorithm component is created through the default construction method, then the parameters are set through ```setXXX```, and other components are connected through ```link / linkTo / linkFrom```. -Here, Jupyter's auto-completion mechanism can be used to provide writing convenience. +Here, Jupyter Notebook's auto-completion mechanism can be used to provide writing convenience. For batch jobs, you can trigger execution through methods such as ```print / collectToDataframe / collectToDataframes``` of batch components or ```BatchOperator.execute ()```; for streaming jobs, start the job with ```StreamOperator.execute ()```. ### More usage: ------ - - [PyAlink Tutorial](docs/pyalink/pyalink-overview.md) - [Interchange between DataFrame and Operator](docs/pyalink/pyalink-dataframe.md) - [StreamOperator data preview](docs/pyalink/pyalink-stream-operator-preview.md) - [UDF/UDTF/SQL usage](docs/pyalink/pyalink-udf.md) - [Use with PyFlink](docs/pyalink/pyalink-pyflink.md) + - [PyAlink Q&A](docs/pyalink/pyalink-qa.md) ## Java API Manual @@ -126,12 +127,31 @@ Pipeline pipeline = new Pipeline().add(va).add(kMeans); pipeline.fit(data).transform(data).print(); ``` +### With Flink-1.11 +```xml + + com.alibaba.alink + alink_core_flink-1.11_2.11 + 1.2.0 + + + org.apache.flink + flink-streaming-scala_2.11 + 1.11.0 + + + org.apache.flink + flink-table-planner_2.11 + 1.11.0 + +``` + ### With Flink-1.10 ```xml com.alibaba.alink alink_core_flink-1.10_2.11 - 1.1.2 + 1.2.0 org.apache.flink @@ -151,7 +171,7 @@ pipeline.fit(data).transform(data).print(); com.alibaba.alink alink_core_flink-1.9_2.11 - 1.1.2 + 1.2.0 org.apache.flink @@ -171,8 +191,8 @@ Run Alink Algorithm with a Flink Cluster 1. Prepare a Flink Cluster: ```shell - wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz - tar -xf flink-1.10.0-bin-scala_2.11.tgz && cd flink-1.10.0 + wget https://archive.apache.org/dist/flink/flink-1.11.0/flink-1.11.0-bin-scala_2.11.tgz + tar -xf flink-1.11.0-bin-scala_2.11.tgz && cd flink-1.11.0 ./bin/start-cluster.sh ``` @@ -185,7 +205,7 @@ Run Alink Algorithm with a Flink Cluster 3. Run Java examples: ```shell ./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar - # ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar - # ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar + # ./bin/flink run -p 1 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar + # ./bin/flink run -p 1 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar ``` diff --git a/README.md b/README.md index 276030f7e..ef8bd2658 100644 --- a/README.md +++ b/README.md @@ -31,37 +31,37 @@ 包名和版本说明: - PyAlink 根据 Alink 所支持的 Flink 版本提供不同的 Python 包: -其中,`pyalink` 包对应为 Alink 所支持的最新 Flink 版本,而 `pyalink-flink-***` 为旧版本的 Flink 版本,当前提供 `pyalink-flink-1.9`。 - - Python 包的版本号与 Alink 的版本号一致,例如`1.1.0`。 +其中,`pyalink` 包对应为 Alink 所支持的最新 Flink 版本,当前为 1.11,而 `pyalink-flink-***` 为旧版本的 Flink 版本,当前提供 `pyalink-flink-1.10` 和 `pyalink-flink-1.9`。 + - Python 包的版本号与 Alink 的版本号一致,例如`1.2.0`。 安装步骤: 1. 确保使用环境中有Python3,版本限于 3.6 和 3.7。 2. 确保使用环境中安装有 Java 8。 3. 使用 pip 命令进行安装: - `pip install pyalink` 或者 `pip install pyalink-flink-1.9`。 + `pip install pyalink`、`pip install pyalink-flink-1.10` 或者 `pip install pyalink-flink-1.9`。 安装注意事项: 1. `pyalink` 和 `pyalink-flink-***` 不能同时安装,也不能与旧版本同时安装。 如果之前安装过 `pyalink` 或者 `pyalink-flink-***`,请使用`pip uninstall pyalink` 或者 `pip uninstall pyalink-flink-***` 卸载之前的版本。 2. 出现`pip`安装缓慢或不成功的情况,可以参考[这篇文章](https://segmentfault.com/a/1190000006111096)修改pip源,或者直接使用下面的链接下载 whl 包,然后使用 `pip` 安装: - - Flink 1.10:[链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.1.2.post0/pyalink-1.1.2.post0-py3-none-any.whl) (MD5: 6bf3a50a4437116793149ead57d9793c) - - Flink 1.9: [链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.1.2.post0/pyalink_flink_1.9-1.1.2.post-py3-none-any.whl) (MD5: e6d2a0ba3549662d77b51a4a37483479) + - Flink 1.11:[链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.2.0/pyalink-1.2.0-py3-none-any.whl) (MD5: 8a38e8009712afcbf5ecdb297db3c8ac) + - Flink 1.10:[链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.2.0/pyalink_flink_1.10-1.2.0-py3-none-any.whl) (MD5: 8a38e8009712afcbf5ecdb297db3c8ac) + - Flink 1.9: [链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.2.0/pyalink_flink_1.9-1.2.0-py3-none-any.whl) (MD5: 2f25eec02a692661c0f0f3a89e5c2f0c) 3. 如果有多个版本的 Python,可能需要使用特定版本的 `pip`,比如 `pip3`;如果使用 Anaconda,则需要在 Anaconda 命令行中进行安装。 ### 开始使用: ------- -我们推荐通过 Jupyter Notebook 来使用 PyAlink,能获得更好的使用体验。 +可以通过 Jupyter Notebook 来开始使用 PyAlink,能获得更好的使用体验。 使用步骤: 1. 在命令行中启动Jupyter:`jupyter notebook`,并新建 Python 3 的 Notebook 。 2. 导入 pyalink 包:`from pyalink.alink import *`。 3. 使用方法创建本地运行环境: `useLocalEnv(parallism, flinkHome=None, config=None)`。 -其中,参数 `parallism` 表示执行所使用的并行度;`flinkHome` 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径;`config`为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功: +其中,参数 `parallism` 表示执行所使用的并行度;`flinkHome` 为 flink 的完整路径,一般情况不需要设置;`config`为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功: ``` JVM listening on *** -Python listening on *** ``` 4. 开始编写 PyAlink 代码,例如: ```python @@ -76,18 +76,18 @@ print(df) ### 编写代码: ------ 在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 `setXXX` 设置参数,通过 `link/linkTo/linkFrom` 与其他组件相连。 -这里利用 Jupyter 的自动补全机制可以提供书写便利。 +这里利用 Jupyter Notebook 的自动补全机制可以提供书写便利。 对于批式作业,可以通过批式组件的 `print/collectToDataframe/collectToDataframes` 等方法或者 `BatchOperator.execute()` 来触发执行;对于流式作业,则通过 `StreamOperator.execute()` 来启动作业。 ### 更多用法: ------ - - [PyAlink 使用介绍](docs/pyalink/pyalink-overview.md) - [DataFrame 与 Operator 互转](docs/pyalink/pyalink-dataframe.md) - [StreamOperator 数据预览](docs/pyalink/pyalink-stream-operator-preview.md) - [UDF/UDTF/SQL 使用](docs/pyalink/pyalink-udf.md) - [与 PyFlink 一同使用](docs/pyalink/pyalink-pyflink.md) + - [PyAlink 常见问题](docs/pyalink/pyalink-qa.md) ## Java 接口使用介绍 ---------- @@ -116,12 +116,31 @@ Pipeline pipeline = new Pipeline().add(va).add(kMeans); pipeline.fit(data).transform(data).print(); ``` +### Flink-1.11 的 Maven 依赖 +```xml + + com.alibaba.alink + alink_core_flink-1.11_2.11 + 1.2.0 + + + org.apache.flink + flink-streaming-scala_2.11 + 1.11.0 + + + org.apache.flink + flink-table-planner_2.11 + 1.11.0 + +``` + ### Flink-1.10 的 Maven 依赖 ```xml com.alibaba.alink alink_core_flink-1.10_2.11 - 1.1.2 + 1.2.0 org.apache.flink @@ -141,7 +160,7 @@ pipeline.fit(data).transform(data).print(); com.alibaba.alink alink_core_flink-1.9_2.11 - 1.1.2 + 1.2.0 org.apache.flink @@ -162,8 +181,8 @@ pipeline.fit(data).transform(data).print(); 1. 准备Flink集群 ```shell - wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz - tar -xf flink-1.10.0-bin-scala_2.11.tgz && cd flink-1.10.0 + wget https://archive.apache.org/dist/flink/flink-1.11.0/flink-1.10.0-bin-scala_2.11.tgz + tar -xf flink-1.11.0-bin-scala_2.11.tgz && cd flink-1.11.0 ./bin/start-cluster.sh ``` @@ -176,6 +195,6 @@ pipeline.fit(data).transform(data).print(); 3. 运行Java示例 ```shell ./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar - # ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar - # ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar + # ./bin/flink run -p 1 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar + # ./bin/flink run -p 1 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar ``` diff --git a/docs/pyalink/pyalink-overview.md b/docs/pyalink/pyalink-overview.md deleted file mode 100644 index cf2f3c523..000000000 --- a/docs/pyalink/pyalink-overview.md +++ /dev/null @@ -1,105 +0,0 @@ -PyAlink 使用介绍 -=============== - - -使用前准备: ---------- - -包名和版本说明: - - - PyAlink 根据 Alink 所支持的 Flink 版本提供不同的 Python 包: -其中,`pyalink` 包对应为 Alink 所支持的最新 Flink 版本,而 `pyalink-flink-***` 为旧版本的 Flink 版本,当前提供 `pyalink-flink-1.9`。 - - Python 包的版本号与 Alink 的版本号一致,例如`1.1.0`。 - -安装步骤: -1. 确保使用环境中有Python3,版本限于 3.6 和 3.7。 -2. 确保使用环境中安装有 Java 8。 -3. 使用 pip 命令进行安装: - `pip install pyalink` 或者 `pip install pyalink-flink-1.9`。 - -安装注意事项: - -1. `pyalink` 和 `pyalink-flink-***` 不能同时安装,也不能与旧版本同时安装。 -如果之前安装过 `pyalink` 或者 `pyalink-flink-***`,请使用`pip uninstall pyalink` 或者 `pip uninstall pyalink-flink-***` 卸载之前的版本。 -2. 出现`pip`安装缓慢或不成功的情况,可以参考[这篇文章](https://segmentfault.com/a/1190000006111096)修改pip源,或者直接使用下面的链接下载 whl 包,然后使用 `pip` 安装: - - Flink 1.10:[链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.1.2.post0/pyalink-1.1.2.post0-py3-none-any.whl) (MD5: 6bf3a50a4437116793149ead57d9793c) - - Flink 1.9: [链接](https://alink-release.oss-cn-beijing.aliyuncs.com/v1.1.2.post0/pyalink_flink_1.9-1.1.2.post0-py3-none-any.whl) (MD5: e6d2a0ba3549662d77b51a4a37483479) -3. 如果有多个版本的 Python,可能需要使用特定版本的 `pip`,比如 `pip3`;如果使用 Anaconda,则需要在 Anaconda 命令行中进行安装。 - -开始使用: -------- -我们推荐通过 Jupyter Notebook 来使用 PyAlink,能获得更好的使用体验。 - -使用步骤: -1. 在命令行中启动Jupyter:```jupyter notebook```,并新建 Python 3 的 Notebook 。 -2. 导入 pyalink 包:```from pyalink.alink import *```。 -3. 使用方法创建本地运行环境: -```useLocalEnv(parallism, flinkHome=None, config=None)```。 -其中,参数 ```parallism``` 表示执行所使用的并行度;```flinkHome``` 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径;```config```为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功: -``` -JVM listening on *** -Python listening on *** -``` -4. 开始编写 PyAlink 代码,例如: -``` -source = CsvSourceBatchOp()\ - .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\ - .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv") -res = source.select(["sepal_length", "sepal_width"]) -df = res.collectToDataframe() -print(df) -``` - -编写代码: ------- -在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 ```setXXX``` 设置参数,通过 ```link/linkTo/linkFrom``` 与其他组件相连。 -这里利用 Jupyter 的自动补全机制可以提供书写便利。 - -对于批式作业,可以通过批式组件的 ```print/collectToDataframe/collectToDataframes``` 等方法或者 ```BatchOperator.execute()``` 来触发执行;对于流式作业,则通过 ```StreamOperator.execute()``` 来启动作业。 - - - -更多用法: ------- - - [DataFrame与Operator互转](pyalink-dataframe.md) - - [StreamOperator数据预览](pyalink-stream-operator-preview.md) - - [UDF使用](pyalink-udf.md) - - [与 PyFlink 一同使用](pyalink-pyflink.md) - - -Q&A: ----- -Q:安装 PyAlink 后,使用时报错:```AttributeError: 'NoneType' object has no attribute 'jvm```,如何解决? - -A:这个报错信息是因为 PyAlink 的 Java 部分没有成功启动导致的: - - 请先检查是否正确安装 Java 8,可以在 Jupyter 中直接运行 ```!java --version```,如果正确显示版本号(比如 1.8.*)则正常,否则请安装 Java 8,并检查环境变量是否正确。 - - 在 Jupyter 中运行```import pyalink; print(pyalink.__path__)```,应该输出一个路径。 - 请使用系统的文件管理工具定位到这个目录,如果这个目录包含有名为 ```alink``` 和 ```lib``` 目录则正常,否则 pyalink 安装有问题,请卸载重装。 ----- - -Q:能否连接远程 Flink 集群进行计算? - -A:通过方法可以连接一个已经启动的 Flink 集群:```useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)```。其中,参数 - - ```host``` 和 ```port``` 表示集群的地址; - - ```parallelism``` 表示执行作业的并行度; - - ```flinkHome``` 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径; - - ```localIp``` 指定实现 ```Flink DataStream``` 的打印预览功能时所需的本机IP地址,需要 Flink 集群能访问。默认为```localhost```。 - - ```shipAlinkAlgoJar``` 是否将 PyAlink 提供的 Alink 算法包传输给远程集群,如果远程集群已经放置了 Alink 算法包,那么这里可以设为 False,减少数据传输。 - - Flink-1.10 及以上版本对应的 pyalink 包,还支持类似 pyflink 脚本的远程集群运行方式。 - ------ - -Q:如何停止长时间运行的Flink作业? - -A:使用本地执行环境时,使用 Notebook 提供的“停止”按钮即可。 -使用远程集群时,需要使用集群提供的停止作业功能。 - ------ - -Q:能否直接使用 Python 脚本而不是 Notebook 运行? - -A:可以。但需要在代码最后调用 resetEnv(),否则脚本不会退出。 - ------ - diff --git a/docs/pyalink/pyalink-pyflink.md b/docs/pyalink/pyalink-pyflink.md index eacac52a6..76fa31039 100644 --- a/docs/pyalink/pyalink-pyflink.md +++ b/docs/pyalink/pyalink-pyflink.md @@ -4,7 +4,7 @@ 在最新的发布中,PyAlink 与 PyFlink 进行了一定的整合。 用户在新版本的 PyAlink 中能够使用 PyFlink 的部分功能,同时 PyAlink 脚本也支持像 PyFlink 脚本一样使用 `flink run` 来提交作业了。 -需要注意的是:这个版本只有 Flink-1.10 对应的 Python 包 `pyalink` 才具有,`pyalink-flink-1.9` 没有以下功能。 +需要注意的是:这个版本只有 Flink-1.10 及以上对应的 Python 包 `pyalink` 才具有,`pyalink-flink-1.9` 没有以下功能。 一个简单的例子 @@ -27,7 +27,7 @@ StreamOperator.execute() 这段代码示例既可以直接在 Notebook 中运行,也可以直接保存成`.py` 的脚本文件,使用 PyFlink 脚本的运行方式来执行: 1. `python ***.py`: 直接使用本地运行环境; -2. `flink run -py ***.py`:将脚本提交给远程集群来运行,参考 [Job Submission Examples](https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html#job-submission-examples)。 +2. `flink run -py ***.py`:将脚本提交给远程集群来运行,参考 [Job Submission Examples](https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples)。 ### 与 PyFlink 共用执行环境 diff --git a/docs/pyalink/pyalink-qa.md b/docs/pyalink/pyalink-qa.md new file mode 100644 index 000000000..d25c1527b --- /dev/null +++ b/docs/pyalink/pyalink-qa.md @@ -0,0 +1,37 @@ +PyAlink 常见问题 +=============== + +Q:安装 PyAlink 后,使用时报错:```AttributeError: 'NoneType' object has no attribute 'jvm```,如何解决? + +A:这个报错信息是因为 PyAlink 的 Java 部分没有成功启动导致的: + - 请先检查是否正确安装 Java 8,可以在 Jupyter 中直接运行 ```!java --version```,如果正确显示版本号(比如 1.8.*)则正常,否则请安装 Java 8,并检查环境变量是否正确。 + - 在 Jupyter 中运行```import pyalink; print(pyalink.__path__)```,应该输出一个路径。 + 请使用系统的文件管理工具定位到这个目录,如果这个目录包含有名为 ```alink``` 和 ```lib``` 目录则正常,否则 pyalink 安装有问题,请卸载重装。 +---- + +Q:能否连接远程 Flink 集群进行计算? + +A:通过方法可以连接一个已经启动的 Flink 集群:```useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)```。其中,参数 + - ```host``` 和 ```port``` 表示集群的地址; + - ```parallelism``` 表示执行作业的并行度; + - ```flinkHome``` 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径; + - ```localIp``` 指定实现 ```Flink DataStream``` 的打印预览功能时所需的本机IP地址,需要 Flink 集群能访问。默认为```localhost```。 + - ```shipAlinkAlgoJar``` 是否将 PyAlink 提供的 Alink 算法包传输给远程集群,如果远程集群已经放置了 Alink 算法包,那么这里可以设为 False,减少数据传输。 + + Flink-1.10 及以上版本对应的 pyalink 包,还支持类似 pyflink 脚本的远程集群运行方式。 + +----- + +Q:如何停止长时间运行的Flink作业? + +A:使用本地执行环境时,使用 Notebook 提供的“停止”按钮即可。 +使用远程集群时,需要使用集群提供的停止作业功能。 + +----- + +Q:能否直接使用 Python 脚本而不是 Notebook 运行? + +A:可以。但需要在代码最后调用 resetEnv(),否则脚本不会退出。 + +----- +