External Data Source
外部数据源的注册流程
DDLParser
Create Temmporary Table Using
语法的解析由DDLParser负责,最后生成一个CreateTableUsing的类。
protected lazy val createTable: Parser[LogicalPlan] =
CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ provider ~ opts =>
CreateTableUsing(tableName, provider, opts)
}
CreateTableUsing (Logical Plan)
CreateTableUsing继承自RunnableCommand,是一个Logical Plan。
CreateTableUsing类接受三个参数:
- tableName:表名
- provider:解析具体文件数据的类或包
- options:解析时需要的其他参数
private[sql] case class CreateTableUsing(
tableName: String,
provider: String,
options: Map[String, String]) extends RunnableCommand {
def run(sqlContext: SQLContext) = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch {
case cnf: java.lang.ClassNotFoundException =>
try loader.loadClass(provider + ".DefaultSource") catch {
case cnf: java.lang.ClassNotFoundException =>
sys.error(s"Failed to load class for data source: $provider")
}
}
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
val relation = dataSource.createRelation(sqlContext, options)
sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
Seq.empty
}
}
CreateTableUsing的run方法定义了这个Logical Plan需要做的事情:
- 加载provider类
- 如果1失败,加载provider包中的DefaultSource类
- 新建加载进来的类的对象,类型转换为org.apache.spark.sql.sources.RelationProvider
- 通过RelationProvider的createRelation方法创建BaseRelation对象
- 通过SqlContext的baseRelationToSchemaRDD方法创建并注册SchemaRDD
baseRelationToSchemaRDD方法
- 首先把baseRelation包装成LogicalRelation(baseRelation)
- 然后调用logicalPlanToSparkQuery
- 最后被包装成 SchemaRDD(SqlContext, LogicalPlan(baseRelation))
implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
logicalPlanToSparkQuery(LogicalRelation(baseRelation))
}
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
ExecutedCommand (Physical Plan)
CreateTableUsing这个Logical Plan最后会被转换为ExecutedCommand的Physcial Plan。执行ExecutedCommand的时候,会调用CreateTableUsing的run函数,从而触发resolver类的加载以及SchemaRDD的注册。
case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
override def output = cmd.output
override def children = Nil
override def executeCollect(): Array[Row] = sideEffectResult.toArray
override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
外部数据源的核心类
下面介绍一下外部数据源的两个核心类RelationProvider和BaseRelation。
RelationProvider
RelationProvider是外部数据源的入口,如果想让SparkSQL读取某种格式的外部数据,必须继承该类。RelationProvider只有一个方法createRelation,接收SqlContext和一堆参数,返回BaseRelation。
trait RelationProvider {
/** Returns a new base relation with the given parameters. */
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}
来看一下SparkSQL自带的读取JSON格式数据的DefaultSource:
- 该类继承自RelationProvider
- createRelation方法首先读取参数path(数据路径)和samplingRate(取样比例)
- 然后创建JSONRelation
private[sql] class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
JSONRelation(fileName, samplingRatio)(sqlContext)
}
}
BaseRelation
SparkSQL用BaseRelation类来把一个数据文件抽象成一张表格,在BaseRelation中定义了Schema。
abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
def sizeInBytes = sqlContext.defaultSizeInBytes
}
BaseRelation有四个子类,如果想让SparkSQL读取某种格式的外部数据,必须继承下面四个类中的其中一个。 四个子类都提供了一个buildScan的方法,返回值都一样是RDD[Row],而参数各不一样。
- TableScan是最基本的BaseRelation,不接收任何参数
- PrunedScan只读取某些特定列,接收Stirng数组作为参数
- PrunedFilteredScan在PrunedScan的基础上还增加行过滤,额外接收Filter数组
- CatalystScan类似于PrunedFilteredScan,不同点是filters的类型为Expression
abstract class TableScan extends BaseRelation {
def buildScan(): RDD[Row]
}
abstract class PrunedScan extends BaseRelation {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
abstract class PrunedFilteredScan extends BaseRelation {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
abstract class CatalystScan extends BaseRelation {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}
看一下SparkSQL为读取json格式的数据实现的JSONRelation:
- JSONRelation继承自TableScan,说明不做任何过滤
- 首先通过SparkContext的textFile生成baseRDD
- 然后调用JsonRDD的inferSchema,从原始数据中分析出Schema信息
- 最后调用JsonRDD的jsonStringToRow,把Json格式的数据转成SchemaRDD格式
private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
@transient val sqlContext: SQLContext)
extends TableScan {
private def baseRDD = sqlContext.sparkContext.textFile(fileName)
override val schema =
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)
override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
}
外部数据源Table查询的计划解析流程
DataSourcesStrategy (Logical Plan -> Physical Plan)
DataSourcesStrategy是Logical Plan转换成Physical Plan中
private[sql] object DataSourceStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>
pruneFilterProjectRaw(
l,
projectList,
filters,
(a, f) => t.buildScan(a, f)) :: Nil
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
pruneFilterProject(
l,
projectList,
filters,
(a, f) => t.buildScan(a, f)) :: Nil
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedScan)) =>
pruneFilterProject(
l,
projectList,
filters,
(a, _) => t.buildScan(a)) :: Nil
case l @ LogicalRelation(t: TableScan) =>
execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
case _ => Nil
}
...
}