


   * Executes a SQL query using Spark, returning the result as a SchemaRDD.  The dialect that is
   * used for SQL parsing can be configured with 'spark.sql.dialect'.
   * @group userf
  def sql(sqlText: String): SchemaRDD = {
    if (dialect == "sql") {
      new SchemaRDD(this, parseSql(sqlText))
    } else {
      sys.error(s"Unsupported SQL dialect: $dialect")

protected[sql] val sqlParser = {
    val fallback = new catalyst.SqlParser
    new catalyst.SparkSQLParser(fallback(_))

class SchemaRDD(
    @transient val sqlContext: SQLContext,
    @transient val baseLogicalPlan: LogicalPlan)
  extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike


   protected[sql] def parseSql(sql: String): LogicalPlan = {

然后调用SchemaRDDLike中的sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成的Unresolved LogicalPlan。

private[sql] trait SchemaRDDLike {
  @transient def sqlContext: SQLContext
  @transient val baseLogicalPlan: LogicalPlan

  private[sql] def baseSchemaRDD: SchemaRDD
  lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)


protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }


   * :: DeveloperApi ::
   * The primary workflow for executing relational queries using Spark.  Designed to allow easy
   * access to the intermediate phases of query execution for developers.
  protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
    lazy val withCachedData = useCachedData(analyzed)
    lazy val optimizedPlan = optimizer(withCachedData)

    // TODO: Don't just pick the first one...
    lazy val sparkPlan = {
    // executedPlan should not be used to initialize any SparkPlan. It should be
    // only used for execution.
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()


  1. 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolved LogicalPlan
  2. 处理UDF
  3. 处理Cache
  4. 使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan
  5. 使用SparkPlan将LogicalPlan转换成PhysicalPlan
  6. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划
  7. 使用execute()执行可执行物理计划
  8. 生成SchemaRDD

