Skip to content

PySpark

shiyuhang0 edited this page Jun 7, 2022 · 14 revisions

EN

TODO

CN

pyspark & pytispark

  • pyspark 是 Apache Spark 的 Python API,由官方发布。
  • pytispark 是为了 TiSpark 和 pyspark 结合使用的小插件,由 TiSpark 维护者发布。

何时使用 pytispark

TiSpark 版本 Spark 版本 是否需要 pytispark
< 2.0 < 2.3
2.4.x 2.3.x 2.4.x
2.5.x > 3.0
  • 在不支持 extension 的 Spark 2.3 之前,TiSpark 通过替换 Spark 类的方式来改变 Spark 执行计划。这带来了一个问题:当我们结合 TiSpark 和 Spark 周边工具使用时,还需要进行额外的适配工作。其中 pytispark 就是为 TiSpark 和 pyspark 结合使用而生。
  • Spark 2.3 之后推出了 extension ,TiSpark 抛弃了上述 hack 的方式转而使用 extension。理论上我们无需适配即可使用所有原生的 Spark 工具。但实际上,我们仍需 pytispark 来解决 [SPARK-25003] 带来的问题。需要明确的是,虽然同样是用了 pytispark ,但使用的目的是不一样的。
  • Spark 3.0 之后, [SPARK-25003] 已被解决,我们可以放心大胆的直接使用 pyspark 了

不使用 pytispark (Spark >= 3.0)

使用方式与 spark-shell 基本一致,spark-shell 的使用见这里

  1. 配置 spark-defaults.conf
spark.sql.extensions  org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses  ${your_pd_adress}
spark.sql.catalog.tidb_catalog  org.apache.spark.sql.catalyst.catalog.TiCatalog
spark.sql.catalog.tidb_catalog.pd.addresses  ${your_pd_adress}
  1. 启动 pyspark
pyspark --jars tispark-assembly-{version}.jar
  1. Read
spark.sql("use tidb_catalog")
spark.sql("select count(*) from ${database}.${table}").show()
  1. Write
df1 = spark.sql("select * from ${src_database}.${src_table}").show()

df1.write
.format("tidb")
.option("database", ${target_database})
.option("table", ${target_table})
.option("tidb.addr","127.0.0.1")
.option("tidb.password","")
.option("tidb.port","4000")
.option("tidb.user","root")
.mode("append")
.save()

使用 pytispark (Spark 2.3 2.4)

Clone this wiki locally