使用Mongodb

Spark使用Mongodb的方法,使用mongodb官方提供的connector

使用方式

開始之前先import

import com.mongodb.spark.sql._

Read

val mongoDataFrame1 = spark.read
  .option("database", "IEDB")
  .option("collection", "EHAVE_TBL_2016")
  .mongo()

Schema

如果沒有指定Schema的話,mongo driver會自動判斷。但是有時候會壞掉!所以還是不要偷懶!

// 用case class定義schema
case class EHAVE_TBL(
    CNAME: String, CCOVEY: String, ISCORE: String,
    IFPASS: String, FACTORY: String, ISEX: String
)

val mongoDataFrame3 = spark.read
    .option("database", "IEDB")
    .option("collection", "EHAVE_TBL_2016")
    // 指定schema
    .mongo[EHAVE_TBL]()

Write

overwrite模式參考以下文件的”Save DataFrames to MongoDB”

mongoDataFrame3.write
  .option("database", "spark_tmp")
  .option("collection", "passCountByFactorySex")
  // 確認是否要overwrite
  //.mode("overwrite")
  .mongo()

DataFrameReader

其實上面範例中用的是spark.sql.DataFrameReaderDataFrameWriter,在眾多連線語法中,我覺得這個最可靠(該有的設定都有,語法簡潔)

DataFrameReader.mongo()

上面使用DataFrameReader建立DataFrame,.mongo().format("com.mongodb.spark.sql").load()的縮寫

uri呢?

一般連mongo會有一個uri,例如

mongodb://192.168.2.201:27017,192.168.2.203:27017,192.168.2.203:27017/?replicaSet=bigdata&readPreference=secondaryPreferred

用來指定mongod的位置,資料庫,或者其他選項,這邊沒有指定uri是因為把預設值寫在 spark.mongodb.input.urispark.mongodb.output.uri 了,詳情請看Spark Interpreter Setting

more!