理解Spark SQL(二)

运用spark sql,除了运用之前介绍的办法,实践上还能够运用sqlcontext或许hivecontext经过编程的方法完成。前者支撑sql语法解析器,后者支撑sql语法解析器和hivesql语法解析器,默以为hivesql语法解析器,用户能够经过装备切换成sql语法解析器来运转hiveql不支撑的语法,如:select 1。实践上hivecontext是sqlcontext的子类,因而在hivecontext运转过程中除了override的函数和变量,能够运用和sqlcontext相同的函数和变量。

由于spark-shell东西实践便是运转的scala程序片段,为了便利,下面选用spark-shell进行演示。

首要来看sqlcontext,由于是规范sql,能够不依赖于hive的metastore,比方下面的比如:

[root@brucecentos4 ~]# $spark_home/bin/spark-shell --master yarn --conf spark.sql.catalogimplementation=in-memory

 

 scala case class offices
defined class offices

scala val rddoffices=sc.textfile.map).map.trim.toint,p,p,p.trim.toint,p.trim.todouble,p.trim.todouble))
rddoffices: org.apache.spark.rdd.rdd[offices] = mappartitionsrdd[3] at map at console :26

scala val officesdataframe = spark.createdataframe
officesdataframe: org.apache.spark.sql.dataframe = [office: int, city: string ... 4 more fields]

scala officesdataframe.createorreplacetempview

scala spark.sql.map).collect.foreach
city: newyork   
city: chicago
city: atlanta

scala



















 履行上面的指令后,实践上在yarn集群中发动了一个yarn client形式的spark application,然后在scala 提示符后输入的句子会生成rdd的transformation,最终一条指令中的collect会生成rdd的action,即会触发job的提交和程序的履行。

指令行中之所以加上--conf spark.sql.catalogimplementation=in-memory选项,是由于spark-shell中的默许发动的sparksession目标spark是默许支撑hive的,不带这个选项发动的话,程序就会去衔接hive metastore,由于这儿并没有发动hive metastore,因而程序在履行createdataframe函数时会报错。

程序中的榜首行是1个case class句子,这儿是界说后边的数据文件的形式的。第二行从hdfs中读取一个文本文件,并工经过map映射到了形式上面。第三行根据第二行的rdd生成dataframe,第四行根据第三行的dataframe注册了一个逻辑上的暂时表,最终一行就能够经过sparksession的sql函数来履行sql句子了。

实践上,sqlcontext是spark 1.x中的sql进口,在spark 2.x中,运用sparksession作为sql的进口,可是为了向后兼容,spark 2.x依然支撑sqlcontext来操作sql,不过会提示deprecated,所以上面的比如是选用spark 2.x中的写法。

实践上还有别的一种办法来操作sql,针对相同的数据,例如:

scala import org.apache.spark.sql._
import org.apache.spark.sql._

scala import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala val schema = new structtype, structfield, structfield, structfield, structfield, structfield))
schema: org.apache.spark.sql.types.structtype = structtype, structfield, structfield, structfield, structfield, structfield)

scala val rowrdd = sc.textfile.map).map.trim.toint,p,p,p.trim.toint,p.trim.todouble,p.trim.todouble))
rowrdd: org.apache.spark.rdd.rdd[org.apache.spark.sql.row] = mappartitionsrdd[3] at map at console :30

scala val dataframe = spark.createdataframe
dataframe: org.apache.spark.sql.dataframe = [office: int, city: string ... 4 more fields]

scala dataframe.createorreplacetempview

scala spark.sql.map).collect.foreach
city: newyork   
city: chicago
city: atlanta























这个比如与之前的比如有一些不同,首要的当地有3个:

1. 之前的比如是选用case class界说形式,spark选用反射来揣度schema;而这个比如选用structtype类型的目标来界说形式,它接纳一个数组,数组成员是structfield目标,代表一个字段的界说,每个字段的界说由字段称号、字段类型和是否答应为空组成;

2. 关于代表数据的rdd,之前的比如是直接用case class界说的类型来切割字段,而这个比如是用的row类型;

3. 在运用createdataframe函数生成dataframe时,该函数的参数不相同,之前的比如只需传入rdd目标即可,而这个比如需求一起传入rdd和界说的schema;

实践编程中主张选用第二种办法,由于其愈加灵敏,schema信息能够不必是写死的,而是能够在程序运转的过程中生成。

 

下面接着来看hivecontext的用法,运用hivecontext之前需求保证:

Back to Top
风格切换
颜色选择