-
Notifications
You must be signed in to change notification settings - Fork 247
PySpark
Xiang Zhang edited this page Oct 10, 2022
·
14 revisions
TODO
- pyspark 是 Apache Spark 的 Python API,由官方发布。
- pytispark 是为了 TiSpark 和 pyspark 结合使用的小插件,由 TiSpark 维护者发布。
TiSpark 版本 | Spark 版本 | 是否需要 pytispark |
---|---|---|
< 2.0 | < 2.3 | ✅ |
2.4.x | 2.3.x 2.4.x | pyspark ❎ spark-sumbit ✅ |
2.5.x | > 3.0 | ❎ |
- 在不支持 extension 的 Spark 2.3 之前,TiSpark 通过替换 Spark 类的方式来改变 Spark 执行计划。这带来了一个问题:当我们结合 TiSpark 和 Spark 周边工具使用时,还需要进行额外的适配工作。其中 pytispark 就是为 TiSpark 和 pyspark 结合使用而生。
- Spark 2.3 之后推出了 extension ,TiSpark 抛弃了上述 hack 的方式转而使用 extension。理论上我们无需适配即可使用所有原生的 Spark 工具。但实际上,我们仍可能需 pytispark 来解决 SPARK-25003 带来的问题。需要明确的是,虽然同样是用了 pytispark ,但使用的目的是不一样的。
- Spark 3.0 之后, SPARK-25003 已被解决,我们可以放心大胆的直接使用 pyspark 了。但由于
This session stuff logic is a bit convoluted and many session changes were made. I wouldn't backport it from 3.0 to 2.x unless it's quite serious one.
该 fix 并没有 back port 到 2.3 以及 2.4 版本。如果你想使用 pyspark 与 tispark, 建议使用 spark 3.0 及以上版本
以 pyspark 为例,使用方式与 spark-shell 基本一致,spark-shell 的使用见这里
- 配置 spark-defaults.conf
spark.sql.extensions org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses ${your_pd_adress}
spark.sql.catalog.tidb_catalog org.apache.spark.sql.catalyst.catalog.TiCatalog
spark.sql.catalog.tidb_catalog.pd.addresses ${your_pd_adress}
- 启动 pyspark
pyspark --jars tispark-assembly-{version}.jar
- Read
spark.sql("use tidb_catalog")
spark.sql("select count(*) from ${database}.${table}").show()
- Write
df1 = spark.sql("select * from ${src_database}.${src_table}")
df1.write
.format("tidb")
.option("database", ${target_database})
.option("table", ${target_table})
.option("tidb.addr","127.0.0.1")
.option("tidb.password","")
.option("tidb.port","4000")
.option("tidb.user","root")
.mode("append")
.save()
pyspark 无需 pytispark 。以 Spark 2.4.2 与 tispark-assembly-2.4.3-scala_2.12.jar 为例
- 配置 spark-defaults.conf
spark.sql.extensions org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses ${your_pd_adress}
- 修改
$SPARK_HOME/python/pyspark/sql/session.py
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
to
jsparkSession = self._jvm.SparkSession.builder().getOrCreate()
- 启动 pyspark
pyspark --jars tispark-assembly-2.4.3-scala_2.12.jar
- Read & Write
df1 = spark.sql("select * from test.src_t")
df1.write
.format("tidb")
.option("database", test)
.option("table", target_t)
.option("tidb.addr","127.0.0.1")
.option("tidb.password","")
.option("tidb.port","4000")
.option("tidb.user","root")
.mode("append")
.save()
Spark 2.3 的修改方式可能不同,参考这里
当你使用 spark 2.3 或 2.4 且使用 spark-sumbit 时,你也同时需要 pytispark
-
Use
pip install pytispark
to install pytispark -
Create a Python file named test.py as below:
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("test")
spark.sql("select count(*) from t").show()
- 配置 spark-defaults.conf
spark.sql.extensions org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses ${your_pd_adress}
- sumbit
spark-submit --jars tispark-${name_with_version}.jar test.py
详情参考这里
详情参考这里