使用Mongodb
Spark使用Mongodb的方法,使用mongodb官方提供的connector
使用方式
開始之前先import
import com.mongodb.spark.sql._
Read
val mongoDataFrame1 = spark.read
.option("database", "IEDB")
.option("collection", "EHAVE_TBL_2016")
.mongo()
Schema
如果沒有指定Schema的話,mongo driver會自動判斷。但是有時候會壞掉!所以還是不要偷懶!
// 用case class定義schema
case class EHAVE_TBL(
CNAME: String, CCOVEY: String, ISCORE: String,
IFPASS: String, FACTORY: String, ISEX: String
)
val mongoDataFrame3 = spark.read
.option("database", "IEDB")
.option("collection", "EHAVE_TBL_2016")
// 指定schema
.mongo[EHAVE_TBL]()
Write
overwrite模式參考以下文件的”Save DataFrames to MongoDB”
mongoDataFrame3.write
.option("database", "spark_tmp")
.option("collection", "passCountByFactorySex")
// 確認是否要overwrite
//.mode("overwrite")
.mongo()
DataFrameReader
其實上面範例中用的是spark.sql.DataFrameReader
與DataFrameWriter
,在眾多連線語法中,我覺得這個最可靠(該有的設定都有,語法簡潔)
DataFrameReader.mongo()
上面使用DataFrameReader建立DataFrame,.mongo()
是.format("com.mongodb.spark.sql").load()
的縮寫
uri呢?
一般連mongo會有一個uri,例如
mongodb://192.168.2.201:27017,192.168.2.203:27017,192.168.2.203:27017/?replicaSet=bigdata&readPreference=secondaryPreferred
用來指定mongod的位置,資料庫,或者其他選項,這邊沒有指定uri是因為把預設值寫在 spark.mongodb.input.uri 與 spark.mongodb.output.uri 了,詳情請看Spark Interpreter Setting