关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源
前面文章介绍了SparSQL通过Hive操作HBase表。
SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。
在HBase中表如下:
create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1} put 'lxw1234','lxw1234.com','f1:c1','name1' put 'lxw1234','lxw1234.com','f1:c2','name2' put 'lxw1234','lxw1234.com','f2:c1','age1' put 'lxw1234','lxw1234.com','f2:c2','age2' put 'lxw1234','lxw1234.com','f3:c1','job1' put 'lxw1234','lxw1234.com','f3:c2','job2' put 'lxw1234','lxw1234.com','f3:c3','job3' hbase(main):025:0* scan 'lxw1234' ROW COLUMN+CELL lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1 lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2 lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1 lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2 lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1 lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2 lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
进入spark-shell
sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077
运行以下代码:
import sqlContext._ var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map( "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)", "hbase_table_name" -> "lxw1234", "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )" )).load() //sparksql_table_schema参数为sparksql中表的定义 //hbase_table_name参数为HBase中表名 //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。 scala> hbasetable.printSchema() root |-- row_key: string (nullable = false) |-- c1: string (nullable = false) |-- c2: string (nullable = false) |-- c3: string (nullable = false) hbasetable.registerTempTable("lxw1234") sqlContext.sql("SELECT * from lxw1234").collect res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3]) sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])
源码
HBaseRelation.scala
package com.lxw1234.sparksql.hbase import java.io.Serializable import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.sources.TableScan import scala.collection.immutable.{HashMap, Map} import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin} import org.apache.spark.sql._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.MapType import org.apache.spark.sql.sources.BaseRelation object Resolver extends Serializable { def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = { val cfColArray = hbaseField.fieldName.split(":",-1) val cfName = cfColArray(0) val colName = cfColArray(1) var fieldRs: Any = null //resolve row key otherwise resolve column if(cfName=="" && colName=="key") { fieldRs = resolveRowKey(result, hbaseField.fieldType) } else { fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType) } fieldRs } def resolveRowKey (result: Result, resultType: String): Any = { val rowkey = resultType match { case "string" => result.getRow.map(_.toChar).mkString case "int" => result .getRow.map(_.toChar).mkString.toInt case "long" => result.getRow.map(_.toChar).mkString.toLong } rowkey } def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = { val column = resultType match { case "string" => result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString case "int" => result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt case "long" => result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong } column } } /** val hbaseDDL = s""" |CREATE TEMPORARY TABLE hbase_people |USING com.shengli.spark.hbase |OPTIONS ( | sparksql_table_schema '(row_key string, name string, age int, job string)', | hbase_table_name 'people', | hbase_table_schema '(:key , profile:name , profile:age , career:job )' |)""".stripMargin */ case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{ val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema")) val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema")) val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema")) val rowRange = hbaseProps.getOrElse("row_range", "->") //get star row and end row val range = rowRange.split("->",-1) val startRowKey = range(0).trim val endRowKey = range(1).trim val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field val registerTableFields = extractRegisterSchema(registerTableSchema) val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields) val hbaseTableFields = feedTypes(tempFieldRelation) val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields) val queryColumns = getQueryTargetCloumns(hbaseTableFields) def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = { val hbaseFields = mapping.map{ case (k,v) => val field = k.copy(fieldType=v.fieldType) field } hbaseFields.toArray } def isRowKey(field: HBaseSchemaField) : Boolean = { val cfColArray = field.fieldName.split(":",-1) val cfName = cfColArray(0) val colName = cfColArray(1) if(cfName=="" && colName=="key") true else false } //eg: f1:col1 f1:col2 f1:col3 f2:col1 def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = { var str = ArrayBuffer[String]() hbaseTableFields.foreach{ field=> if(!isRowKey(field)) { str += field.fieldName } } str.mkString(" ") } lazy val schema = { val fields = hbaseTableFields.map{ field=> val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName val relatedType = field.fieldType match { case "string" => SchemaType(StringType,nullable = false) case "int" => SchemaType(IntegerType,nullable = false) case "long" => SchemaType(LongType,nullable = false) } StructField(name,relatedType.dataType,relatedType.nullable) } StructType(fields) } def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = { if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!") val rs = externalHBaseTable.zip(registerTable) rs.toMap } /** * spark sql schema will be register * registerTableSchema '(rowkey string, value string, column_a string)' */ def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = { val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1) val fieldsArray = fieldsStr.split(",").map(_.trim) fieldsArray.map{ fildString => val splitedField = fildString.split("\\s+", -1) RegisteredSchemaField(splitedField(0), splitedField(1)) } } //externalTableSchema '(:key , f1:col1 )' def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = { val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1) val fieldsArray = fieldsStr.split(",").map(_.trim) fieldsArray.map(fildString => HBaseSchemaField(fildString,"")) } // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits. lazy val buildScan = { val hbaseConf = HBaseConfiguration.create() hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName) hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns); hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey); hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey); val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD( hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val rs = hbaseRdd.map(tuple => tuple._2).map(result => { var values = new ArrayBuffer[Any]() hbaseTableFields.foreach{field=> values += Resolver.resolve(field,result) } Row.fromSeq(values.toSeq) }) rs } private case class SchemaType(dataType: DataType, nullable: Boolean) // // private def toSqlType(hbaseSchema: Schema): SchemaType = { // SchemaType(StringType,true) // } }
DefaultSource.scala
package com.lxw1234.sparksql.hbase import org.apache.spark.sql.SQLContext import org.apache.spark.sql.sources.RelationProvider class DefaultSource extends RelationProvider { def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = { HBaseRelation(parameters)(sqlContext) } }
package.scala
package com.lxw1234.sparksql import org.apache.spark.sql.SQLContext import scala.collection.immutable.HashMap package object hbase { abstract class SchemaField extends Serializable case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable case class Parameter(name: String) protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema") protected val HBASE_TABLE_NAME = Parameter("hbase_table_name") protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema") protected val ROW_RANGE = Parameter("row_range") /** * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table. */ implicit class HBaseContext(sqlContext: SQLContext) { def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = { var params = new HashMap[String, String] params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema) params += ( HBASE_TABLE_NAME.name -> hbaseTableName) params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema) //get star row and end row params += ( ROW_RANGE.name -> rowRange) sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext)); //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext)) } } // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) { // def saveIntoTable(tableName: String): Unit = ??? // } }
相关配置和说明
- 本来在SparkSQL中通过外部数据源建表的语法是:
CREATE TEMPORARY TABLE hbasetable
USING com.lxw1234.sparksql.hbase
OPTIONS (
sparksql_table_schema ‘(row_key string, c1 string, c2 string, c3 string)’,
hbase_table_name ‘lxw1234′,
hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’
)
在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。
是否是因为Spark1.4包的编译了Hive的原因?
- 上面源码的编译依赖HBase的相关jar包:
hbase-client-0.96.1.1-cdh5.0.0.jar
hbase-common-0.96.1.1-cdh5.0.0.jar
hbase-protocol-0.96.1.1-cdh5.0.0.jar
hbase-server-0.96.1.1-cdh5.0.0.jar
还有HBase的集群信息:
hbase.zookeeper.quorum
hbase.client.scanner.caching
我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm
- 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。
https://github.com/OopsOutOfMemory/spark-sql-hbase
- 此程序只做学习和测试使用,并未测试性能
如果觉得本博客对您有帮助,请 赞助作者 。