微信搜索lxw1234bigdata | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

SparkSQL读取HBase数据,通过自定义外部数据源

Spark lxw1234@qq.com 27264℃ 2评论

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

前面文章介绍了SparSQL通过Hive操作HBase表。

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

在HBase中表如下:

create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}

put 'lxw1234','lxw1234.com','f1:c1','name1'
put 'lxw1234','lxw1234.com','f1:c2','name2'
put 'lxw1234','lxw1234.com','f2:c1','age1'
put 'lxw1234','lxw1234.com','f2:c2','age2'
put 'lxw1234','lxw1234.com','f3:c1','job1'
put 'lxw1234','lxw1234.com','f3:c2','job2'
put 'lxw1234','lxw1234.com','f3:c3','job3'

hbase(main):025:0* scan 'lxw1234'
ROW COLUMN+CELL
lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3


进入spark-shell

sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

运行以下代码:

import sqlContext._


var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(
"sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",
"hbase_table_name" -> "lxw1234",
"hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"
)).load()

//sparksql_table_schema参数为sparksql中表的定义
//hbase_table_name参数为HBase中表名
//hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。


scala> hbasetable.printSchema()
root
 |-- row_key: string (nullable = false)
 |-- c1: string (nullable = false)
 |-- c2: string (nullable = false)
 |-- c3: string (nullable = false)

hbasetable.registerTempTable("lxw1234")


sqlContext.sql("SELECT * from lxw1234").collect
res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])

sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect
res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])

源码

HBaseRelation.scala

package com.lxw1234.sparksql.hbase

import java.io.Serializable
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.sources.TableScan
import scala.collection.immutable.{HashMap, Map}
import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
import org.apache.spark.sql._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.sources.BaseRelation


object Resolver extends  Serializable {

  def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
    val cfColArray = hbaseField.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    var fieldRs: Any = null
    //resolve row key otherwise resolve column
    if(cfName=="" && colName=="key") {
      fieldRs = resolveRowKey(result, hbaseField.fieldType)
    } else {
      fieldRs =  resolveColumn(result, cfName, colName,hbaseField.fieldType)
    }
    fieldRs
  }

  def resolveRowKey (result: Result, resultType: String): Any = {
     val rowkey = resultType match {
      case "string" =>
        result.getRow.map(_.toChar).mkString
      case "int" =>
        result  .getRow.map(_.toChar).mkString.toInt
      case "long" =>
        result.getRow.map(_.toChar).mkString.toLong
    }
    rowkey
  }

  def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
    val column = resultType match {
      case "string" =>
        result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
      case "int" =>
        result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt
      case "long" =>
        result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong
    }
    column
  }
}

/**
   val hbaseDDL = s"""
      |CREATE TEMPORARY TABLE hbase_people
      |USING com.shengli.spark.hbase
      |OPTIONS (
      |  sparksql_table_schema   '(row_key string, name string, age int, job string)',
      |   hbase_table_name     'people',
      | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
      |)""".stripMargin
 */
case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{

  val hbaseTableName =  hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  val hbaseTableSchema =  hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  val rowRange = hbaseProps.getOrElse("row_range", "->")
  //get star row and end row
  val range = rowRange.split("->",-1)
  val startRowKey = range(0).trim
  val endRowKey = range(1).trim

  val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  val registerTableFields = extractRegisterSchema(registerTableSchema)
  val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)

  val hbaseTableFields = feedTypes(tempFieldRelation)
  val fieldsRelations =  tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  val queryColumns =  getQueryTargetCloumns(hbaseTableFields)

  def  feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) :  Array[HBaseSchemaField] = {
         val hbaseFields = mapping.map{
           case (k,v) =>
               val field = k.copy(fieldType=v.fieldType)
               field
        }
        hbaseFields.toArray
  }

  def isRowKey(field: HBaseSchemaField) : Boolean = {
    val cfColArray = field.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    if(cfName=="" && colName=="key") true else false
  }

  //eg: f1:col1  f1:col2  f1:col3  f2:col1
  def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
    var str = ArrayBuffer[String]()
    hbaseTableFields.foreach{ field=>
         if(!isRowKey(field)) {
           str +=  field.fieldName
         }
    }
    str.mkString(" ")
  }
  lazy val schema = {
    val fields = hbaseTableFields.map{ field=>
        val name  = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
        val relatedType =  field.fieldType match  {
          case "string" =>
            SchemaType(StringType,nullable = false)
          case "int" =>
            SchemaType(IntegerType,nullable = false)
          case "long" =>
            SchemaType(LongType,nullable = false)
        }
        StructField(name,relatedType.dataType,relatedType.nullable)
    }
    StructType(fields)
  }

  def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField],  registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
       if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
       val rs = externalHBaseTable.zip(registerTable)
       rs.toMap
  }

    /**
     * spark sql schema will be register
     *   registerTableSchema   '(rowkey string, value string, column_a string)'
      */
  def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
         val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
         val fieldsArray = fieldsStr.split(",").map(_.trim)
         fieldsArray.map{ fildString =>
           val splitedField = fildString.split("\\s+", -1)
           RegisteredSchemaField(splitedField(0), splitedField(1))
         }
   }

  //externalTableSchema '(:key , f1:col1 )'
  def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
        val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
        val fieldsArray = fieldsStr.split(",").map(_.trim)
        fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  }



  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  lazy val buildScan = {

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);

    val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
      hbaseConf,
      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )


    val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
      var values = new ArrayBuffer[Any]()
      hbaseTableFields.foreach{field=>
        values += Resolver.resolve(field,result)
      }
      Row.fromSeq(values.toSeq)
    })
    rs
  }

  private case class SchemaType(dataType: DataType, nullable: Boolean)
//
//  private def toSqlType(hbaseSchema: Schema): SchemaType = {
//    SchemaType(StringType,true)
//  }
}

DefaultSource.scala

package com.lxw1234.sparksql.hbase

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.RelationProvider


class DefaultSource extends RelationProvider {
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    HBaseRelation(parameters)(sqlContext)
  }
}

package.scala

package com.lxw1234.sparksql

import org.apache.spark.sql.SQLContext
import scala.collection.immutable.HashMap



package object hbase {

  abstract class SchemaField extends Serializable

   case class RegisteredSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable

   case class HBaseSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable

   case class Parameter(name: String)


  protected  val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  protected  val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  protected  val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  protected  val ROW_RANGE = Parameter("row_range")
  /**
   * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
   */
  implicit class HBaseContext(sqlContext: SQLContext) {
    def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
      var params = new HashMap[String, String]
      params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
      params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
      params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
      //get star row and end row
      params += ( ROW_RANGE.name -> rowRange)
      sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
      //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))
    }
  }

//  implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {
//    def saveIntoTable(tableName: String): Unit = ???
//  }
}

 

相关配置和说明

  • 本来在SparkSQL中通过外部数据源建表的语法是:

CREATE TEMPORARY TABLE hbasetable

USING com.lxw1234.sparksql.hbase

OPTIONS (

sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

hbase_table_name   ‘lxw1234′,

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

)

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

是否是因为Spark1.4包的编译了Hive的原因?

  • 上面源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能

 

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » SparkSQL读取HBase数据,通过自定义外部数据源

喜欢 (8)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(2)个小伙伴在吐槽
  1. 作者您好? 我用的是你这里代码整合hbase ,发现一处bug。 整合超过五个列,在spark-sql 上, 列和内容 就不一致了。顺序被打乱了?请问你知道这个bug吗
    路人2016-08-24 16:00 回复
    • 这个问题解决了吗,我也碰到了呢
      dsj2017-12-02 13:45 回复