• 0

  • 463

Spark连接Hbase并写入数据的两种方法

猿人不正经

不想写代码

6天前

前面和大家聊过Spark和Hive的集成,以及如何连接MySQL,今天主要就说下Spark连接Hbase,这可能要复杂一点,首先Hbase是列式存储的,结构可能和前面所说的数据库不一样,这样就必须要转换,而且导的依赖包也不一样,不是在maven仓库中所下的。下面就和大家说下。

导入依赖包

接下来spark集成hbase,把hbase中lib下的以下jar包拷贝到spark中的jars文件夹下面:
在这里插入图片描述
导入后重新启动Spark就可以了。
有时会报zookeeper的错误,这时可以把zookeeper-3.4.6.jar的jar包也试着导入Spark中。
spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。
连接到zookeeper可以在HBaseConfiguration实例中设置,
如果不设置,默认连接的是localhost:2181会报错:connection refused。

首先确认好已经创建过hbase的表,表名就叫account,做好以上这些准备工作之后,就可以开始连接Hbase了。

Spark连接Hbase并读取数据转化成RDD

import org.apache.hadoop.hbase.{
    HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.util.Bytes


object SparkReadHBase {
    
   def main(args: Array[String]): Unit = {
    
   	val sparkconf = new SparkConf().setAppName("SparkHBase").setMaster("local").getorCreate()
        val sc = new SparkContext(sparkconf)
        val tablename = "account"
        val conf = HBaseConfiguration.create()
     
        //设置zooKeeper集群地址
        conf.set("hbase.zookeeper.quorum","spark02,spark03,spark04")

       //设置zookeeper连接端口,默认2181
       conf.set("hbase.zookeeper.property.clientPort", "2181")
       conf.set(TableInputFormat.INPUT_TABLE, tablename)
       
       // 如果表不存在则创建表
       val admin = new HBaseAdmin(conf)
       if (!admin.isTableAvailable(tablename)) {
    
            val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
            admin.createTable(tableDesc)
        }

       //读取数据并转化成rdd
       
       val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
                           classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
                           classOf[org.apache.hadoop.hbase.client.Result])
       val count = hBaseRDD.count()
          
       println(count)
       hBaseRDD.foreach{
    case (_,result) =>{
    
         
                     //获取行键
                  val key = Bytes.toString(result.getRow)
                     //通过列族和列名获取列
                  val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
                  val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
                 
                  println("Row key:"+key+" Name:"+name+" Age:"+age)
         }
       }
        sc.stop()
     admin.close()
     }

}

使用saveAsHadoopDataset写入数据到Hbase

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{
    SparkConf, SparkContext}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object SparkWriteHBaseOne {
    
   def main(args: Array[String]): Unit = {
    
      val sparkConf = new SparkConf().setAppName("HBaseSpark").setMaster("local")
      val sc = new SparkContext(sparkConf)
      val conf = HBaseConfiguration.create()
      //设置zooKeeper集群地址
      conf.set("hbase.zookeeper.quorum","spark02,spark03,spark04")
      //设置zookeeper连接端口,默认2181
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      val tablename = "account"
      //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
      val jobConf = new JobConf(conf)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
      val indataRDD = sc.makeRDD(Array("1,zhangsan,23","2,Lisi,25","3,wangwu,32"))
      val rdd = indataRDD.map(_.split(',')).map{
    arr=>{
    
      
       /*一个Put对象就是一行记录,在构造方法中指定主键
        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
        * Put.add方法接收三个参数:列族,列名,数据
        */
              val put = new Put(Bytes.toBytes(arr(0).toInt))
              put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
              put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
           
            //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
            (new ImmutableBytesWritable, put)
      }
     }
      rdd.saveAsHadoopDataset(jobConf)
      sc.stop()
  }
 
}

使用saveAsNewAPIHadoopDataset写入数据到Hbase

import org.apache.hadoop.hbase.client.{
    Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._


object SparkWriteHBaseTwo {
    
   def main(args: Array[String]): Unit = {
    
     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
     val sc = new SparkContext(sparkConf)
     
     val tablename = "account"
     
     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","spark02,spark03,spark04")
     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
     
     val job = new Job(sc.hadoopConfiguration)
     
     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
     job.setOutputValueClass(classOf[Result])
     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
     
     val indataRDD = sc.makeRDD(Array("1,zhangsan,23","2,Lisi,25","3,wangwu,32"))
     val rdd = indataRDD.map(_.split(',')).map{
    arr=>{
    
       
               val put = new Put(Bytes.toBytes(arr(0)))
               put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
               put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
              (new ImmutableBytesWritable, put)
        }
      }
      
     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
  }
}
免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

463

相关文章推荐

未登录头像

暂无评论