少年游

欲买桂花同载酒,终不似,少年游。

0%

spark同步mongo数据到hive方法汇总

环境

  • spark 2.3.2

单机测试

  1. mongo version >= 3.2

    1
    spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.2 
  2. mongo version < 3.2

  • 当 mongo 版本低于 3.2 会报错。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    ERROR partitioner.DefaultMongoPartitioner:
    ----------------------------------------
    WARNING: MongoDB version < 3.2 detected.
    ----------------------------------------
    With legacy MongoDB installations you will need to explicitly configure the Spark Connector with a partitioner.
    This can be done by:
    * Setting a "spark.mongodb.input.partitioner" in SparkConf.
    * Setting in the "partitioner" parameter in ReadConfig.
    * Passing the "partitioner" option to the DataFrameReader.
    The following Partitioners are available:
    * MongoShardedPartitioner - for sharded clusters, requires read access to the config database.
    * MongoSplitVectorPartitioner - for single nodes or replicaSets. Utilises the SplitVector command on the primary.
    * MongoPaginateByCountPartitioner - creates a specific number of partitions. Slow as requires a query for every partition.
    * MongoPaginateBySizePartitioner - creates partitions based on data size. Slow as requires a query for every partition.
  • 根据自己的 mongo 情况选择分区器,并为 spark-shell 或者 spark-submit 配置 conf。因为进入 spark-shell 之后配置会不生效。
  • spark.mongodb.input.sampleSize=10000000. spark 在推断表结构的时候使用的数据量,我自己遇到了因为数据量小schema不断变化的情况。
    1
    2
    3
    4
    spark-shell   \
    --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.2 \
    --conf spark.mongodb.input.partitioner=MongoPaginateByCountPartitioner \
    --conf spark.mongodb.input.sampleSize=10000000

代码

  • 用命令:paste 将代码粘贴
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    import com.mongodb.spark._

    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder()
    .master("local")
    .getOrCreate()

    //对应的自行替换
    val uri = "mongodb://$username:$passwordd@$host:$port/$db?connectTimeoutMS=10000&socketTimeoutMS=30000&readPreference=secondaryPreferred"

    val collection = "user_interest"
    val df = spark.read.format("com.mongodb.spark.sql")
    .option("uri", uri)
    .option("collection", collection)
    .load()

    // 查看表结构
    df.printSchema()

    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    val hdfs_path = "/warehouse/hive/$db.db/$collection"
    // 可以选择save spark table 也可以选择save hdfs
    df.write.mode(SaveMode.Overwrite).orc(hdfs_path)

线上调度提交

  • 需要额外提供两个jar, 也可以 assembly package

    • mongo-spark-connector_2.11-2.3.2.jar
    • mongo-java-driver-3.8.2.jar
    • spark-mongo-hive_2.11-0.1.jar // 自行打包
  • spark-submit 样例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    spark-submit \
    --class com.xuchuanhua.sync.MongoToHiveWithSpark \
    --jars ../../jar/mongo-spark-connector_2.11-2.3.2.jar,../../jar/mongo-java-driver-3.8.2.jar \
    --master yarn \
    --deploy-mode client \
    --queue offline \
    --executor-memory 1G \
    --driver-memory 1G \
    --num-executors 10 \
    --executor-cores 4 \
    --driver-cores 4 \
    ../../jar/spark-mongo-hive_2.11-0.1.jar $1 $2 $3

相关问题

com.mongodb.MongoSocketReadTimeoutException: Timeout while receiving message

  • 调大 socketTimeoutMS,socketTimeoutMS=30000

连接从库readPreference=secondaryPreferred

  • readPreference=secondaryPreferred

参考

[1]. Spark Connector Scala Guide