如果需要运行在Yarn上,Yarn所有机器上需要安装R环境。
SparkR Shell是一个交互式命令行,用户可以输入R代码,进行交互式操作。 SparkR有两种模式:Local模式和Yarn-Client模式。
Local模式下,任务将会运行在本地机器。
$SPARK_HOME/bin/sparkR --master local[*]
Yarn-Client模式下,Spark的Driver运行在本地机器,Executor运行在Yarn的节点上。
$SPARK_HOME/bin/sparkR --master yarn-client --num-executors 2
通过Spark Submit,用户可以把R的任务提交到集群上运行。 Spark Submit有三种模式:Local模式,Yarn-Client模式以及Yarn-Cluser模式。 其中Local模式和Yarn-Client模式同SparkR Shell,Yarn-Cluser模式中Executor和Driver都运行在Yarn节点。 Local模式和Yarn-Client模式适合调试,Yarn-Cluser模式适合生产环境。
$SPARK_HOME/bin/spark-submit --master yarn-cluster --num-executors 2 /path/to/test.R
用SparkR Shell调用R的函数install.packages
下载(只需下载一次)需要依赖的External Package,
这些Package会自动下载到$SPARK_HOME/R/lib目录下。
$SPARK_HOME/bin/sparkR --master local[*]
install.packages("matlab", repos = "http://mirror.bjtu.edu.cn/cran")
Spark会自动把$SPARK_HOME/R/lib
下的R Packages打包成$SPARK_HOME/R/lib/sparkr.zip
并上传到相应的Yarn结点。
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster \
--num-executors 2 \
/path/to/example.R
由于SparkR Package中没有export RDD相关的函数,所以只能使用R的语法:::
来调用RDD API。
# Word Count
library(SparkR)
sc <- sparkR.init()
path <- "/tmp/test.txt"
# RDD[String]
rdd <- SparkR:::textFile(sc, path)
# RDD[String]
words <- SparkR:::flatMap(rdd, function(line) {
strsplit(line, " ")[[1]]
})
# RDD[(String, Int)]
wordCount <- SparkR:::lapply(words, function(word) {
list(word, 1L)
})
# RDD[(String, Int)]
counts <- SparkR:::reduceByKey(wordCount, "+", 2L)
SparkR:::collect(counts)
如果在SparkR:::lapply
和SparkR:::lapplyPartition
等函数中使用了R Packages,
这就需要让Spark在执行器上加载相应的R Packages,SparkR提供了SparkR:::includePackage
函数来完成这个功能。
# Use Matrix Package
library(SparkR)
sc <- sparkR.init()
# import Matrix Package on Executors
SparkR:::includePackage(sc, Matrix)
# RDD[Int]
rdd <- SparkR:::parallelize(sc, 1 : 10, 2)
# function: map x to a Matrix
generateSparse <- function(x) {
sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
}
# RDD[Matrix]
sparseMat <- SparkR:::lapplyPartition(rdd, generateSparse)
SparkR:::collect(sparseMat)
install.packages
是R中用来动态安装R Packages的函数,在SparkR中调用这个函数会把对应的R Packages下载到
$SPARK_HOME/R/lib
目录,如果运行在Yarn模式下,lib目录下所有的R Packages会一起打包成
library(SparkR)
install.packages(“matlab”, repos = “http://mirror.bjtu.edu.cn/cran”)
sc <- sparkR.init() library(matlab)
SparkR:::includePackage(sc, matlab)
rdd <- SparkR:::parallelize(sc, 1 : 10, 2)
generateOnes <- function(x) { ones(3) }
onesRDD <- SparkR:::lapplyPartition(rdd, generateOnes)
SparkR:::collect(onesRDD) ```