环境
- spark 2.3.2
单机测试
mongo version >= 3.2
1
spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.2
mongo version < 3.2
- 当 mongo 版本低于 3.2 会报错。
1
2
3
4
5
6
7
8
9
10
11
12
13
14ERROR partitioner.DefaultMongoPartitioner:
----------------------------------------
WARNING: MongoDB version < 3.2 detected.
----------------------------------------
With legacy MongoDB installations you will need to explicitly configure the Spark Connector with a partitioner.
This can be done by:
* Setting a "spark.mongodb.input.partitioner" in SparkConf.
* Setting in the "partitioner" parameter in ReadConfig.
* Passing the "partitioner" option to the DataFrameReader.
The following Partitioners are available:
* MongoShardedPartitioner - for sharded clusters, requires read access to the config database.
* MongoSplitVectorPartitioner - for single nodes or replicaSets. Utilises the SplitVector command on the primary.
* MongoPaginateByCountPartitioner - creates a specific number of partitions. Slow as requires a query for every partition.
* MongoPaginateBySizePartitioner - creates partitions based on data size. Slow as requires a query for every partition. - 根据自己的 mongo 情况选择分区器,并为 spark-shell 或者 spark-submit 配置 conf。因为进入 spark-shell 之后配置会不生效。
spark.mongodb.input.sampleSize=10000000
. spark 在推断表结构的时候使用的数据量,我自己遇到了因为数据量小schema不断变化的情况。1
2
3
4spark-shell \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.2 \
--conf spark.mongodb.input.partitioner=MongoPaginateByCountPartitioner \
--conf spark.mongodb.input.sampleSize=10000000
代码
- 用命令
:paste
将代码粘贴1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import com.mongodb.spark._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.getOrCreate()
//对应的自行替换
val uri = "mongodb://$username:$passwordd@$host:$port/$db?connectTimeoutMS=10000&socketTimeoutMS=30000&readPreference=secondaryPreferred"
val collection = "user_interest"
val df = spark.read.format("com.mongodb.spark.sql")
.option("uri", uri)
.option("collection", collection)
.load()
// 查看表结构
df.printSchema()
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
val hdfs_path = "/warehouse/hive/$db.db/$collection"
// 可以选择save spark table 也可以选择save hdfs
df.write.mode(SaveMode.Overwrite).orc(hdfs_path)
线上调度提交
需要额外提供两个jar, 也可以
assembly
package- mongo-spark-connector_2.11-2.3.2.jar
- mongo-java-driver-3.8.2.jar
- spark-mongo-hive_2.11-0.1.jar // 自行打包
spark-submit 样例
1
2
3
4
5
6
7
8
9
10
11
12spark-submit \
--class com.xuchuanhua.sync.MongoToHiveWithSpark \
--jars ../../jar/mongo-spark-connector_2.11-2.3.2.jar,../../jar/mongo-java-driver-3.8.2.jar \
--master yarn \
--deploy-mode client \
--queue offline \
--executor-memory 1G \
--driver-memory 1G \
--num-executors 10 \
--executor-cores 4 \
--driver-cores 4 \
../../jar/spark-mongo-hive_2.11-0.1.jar $1 $2 $3
相关问题
com.mongodb.MongoSocketReadTimeoutException: Timeout while receiving message
- 调大 socketTimeoutMS,socketTimeoutMS=30000
连接从库readPreference=secondaryPreferred
- readPreference=secondaryPreferred