使用ImportTSV向HBase中导入数据

概述:

通常,在我们将数据导入HBase时,若是小批量的数据,使用HBase提供的API就可以满足需求,但是要灌入大量数据的时候,使用API的方式导入,会占用大量Regionserver的资源,影响该Regionserver上其他表的查询

为了解决这种问题,HBase官方提供了两种基于MapReduce的大量数据导入的方法:

1.ImportTSV

2.BuckLoad

数据写入流程:

在介绍今天的ImportTSV之前,先来看一下普通方式下,HBase对数据写入/删除的处理步骤,写入/删除的过程如下图:

1.客户端向HRegionServer发起请求,要求写入数据

2.HRegionServer先将数据写入HLog中,即图中WAL过程

3.HRegionServer将数据写入Memstore中,做缓存

4.当内存中的数据达到阈值时,将数据Flush(刷写)到硬盘中,即写入到HFile文件中,并同时删除内存和HLog中的历史数据


HBase在HDFS的存储结构:

我们知道,HBase的数据最终是以HFile的形式存储在HDFS中的,在HDFS中HBase的目录结构如下

/HBase指定的存储目录/HBase表/表分布的Region/列族/HFile

举个例子:

现有一张表user,有两个列族info和desc,user的region分布情况如下:


在HDFS中user表存储路径如下所示,无视两个.开头的文件,其他的目录名称均为上图中,Region对应的code码


点进去一个Region目录,目录内容如下图:

其中info和desc这两个目录,就是HBase创建表时定义的列族,HFile存放在该目录下


我们去看一下info列族下的HFile

说了这么多,现在来了解一下ImportTsv

初识ImportTSV:

ImportTsv是HBase提供的一个命令行工具,将存储在HDFS上的数据文件,通过指定的分隔符解析后,导入到HBase表中

这样的方式导入数据与正常写入流程不同的是,跳过了WAL、Memcache与Flush的过程,直接将HFile文件移动到HBase表空间目录下即可,不影响HRegionServer的性能

ImportTsv包含两种方式将数据导入到HBase表中:

第一种:使用TableOutputformat在reduce端插入数据,但是这种方式导入大批量数据的时候有可能会存在问题,尤其是列比较多的宽表导入的时候,会出现RegionTooBusyException,导致数据丢失,因此建议在数据量不是特别大并且列不是特别多的情况下使用

第二种推荐:先使用MarpReduce程序生成HFile文件,再执行LoadIncrementalHFiles命令,将文件移动到HBase表中对应的存储目录下

ImportTsv用法:

命令如下

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

默认情况下,是第一种导入方式,即将数据在reduce端直接导入到HBase中

参数说明:

  • -Dimporttsv.columns=a,b,c    指定导入到HBase表中的列
  • <tablename>    表名
  • <inputdir>    数据文件在HDFS上的目录地址直接写目录地址即可,例: /data/data.txt

上述这些参数,只是默认直接导入所需基本的参数

如果数据文件不是使用默认的分隔符 \t 进行分割,需要指定文件分隔符:

  • -Dimporttsv.separator=”,”        指定分隔符为逗号

如果使用第二种方法,要生成HFile文件,添加如下参数:

  •  -Dimporttsv.bulk.output=/path/for/output        指定生成的HFile文件在HDFS的存储目录地址

除此之外,还有一些运行参数可供选择:

  • -Dimporttsv.skip.bad.lines=true / false        在导入过程中,如果有不符合分割标准的行,被称之为badlines,设置是否跳过,如果不跳过,则Map-Reduce任务停止
  • -Dimporttsv.timestamp=currentTimeAsLong        指定导入的时间戳
  • -Dimporttsv.mapper.class=my.Mapper            指定用户自定义的Mapper程序,用于替换默认的Mapper:org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
  • -Dmapreduce.job.name=jobName            用户指定MapReduce任务名称
  • -Dcreate.table=  yes/no                如果HBase中没有创建表,是否使用ImportTSV工具创建该表,如果设置为no,则在HBase中表必须存在
  • -Dno.strict= true / false                是否忽略HBASE表中的列族检查,默认为false

如果考虑提升导入性能,可以参考以下参数:

  • -Dmapreduce.map.speculative=false            关闭Map端推断
  • -Dmapreduce.reduce.speculative=false             关闭Reduce端推断

实例:

现有一个数据文件user.txt,文件内容如下:

注:图片所示数据均为假数据,如有雷同,纯属巧合!


对应的列分别为:手机号反转,姓名,性别,年龄,身份证号,户籍地址,生日

现要将数据文件导入到user表中,user表建表语句如下:

info列族为用户基本信息

desc为用户其他描述信息

create 'user',
{NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', VERSIONS => '1', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false',REPLICATION_SCOPE=>'1'},
{NAME => 'desc', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', VERSIONS => '1', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536',IN_MEMORY => 'false', BLOCKCACHE => 'false',REPLICATION_SCOPE=>'1'},
SPLITS => ['0','1', '2', '3', '4','5','6','7','8','9']

需求:

需要将user.txt文件中数据导入到user表中的info列族,row-key为用户手机号反转

第一种方式导入(简单介绍):

指定分隔符为逗号

指定导入的列:第一列为HBASE_ROW_KEY,其他列都是info列族下的name,gender,age,idNumber,address,birthday列

指定需要导入的表名:user

指定数据文件在HDFS上的目录地址:/louisvv/user.txt

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,info:idNumber,info:address,info:birthday user /louisvv/user.txt

第二种方式导入(重点介绍):

指定分隔符为逗号

指定产生的HFile在HDFS上存放的临时目录:/louisvv/hfile_tmp

指定导入的列:第一列为HBASE_ROW_KEY,其他列都是info列族下的name,gender,age,idNumber,address,birthday列

指定需要导入的表名:user

指定数据文件在HDFS上的目录地址:/louisvv/user.txt

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=/louisvv/hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,info:idNumber,info:address,info:birthday user /yw/user.txt

截取Map-Reduce部分日志如下:

可以看到MR的基本信息,共2000002(二百万)条数据,ImportTsv 处理的Bad Lines=0,即所有数据都成功的导入了,整个过程大约花费了1分20秒左右的时间,可以看出ImportTsv的效率还是挺快的

2018-10-15 16:54:24,570 INFO  [main] mapreduce.Job:  map 0% reduce 0%
2018-10-15 16:54:35,670 INFO  [main] mapreduce.Job:  map 20% reduce 0%
2018-10-15 16:54:38,688 INFO  [main] mapreduce.Job:  map 33% reduce 0%
2018-10-15 16:54:45,730 INFO  [main] mapreduce.Job:  map 50% reduce 0%
2018-10-15 16:54:54,795 INFO  [main] mapreduce.Job:  map 70% reduce 0%
2018-10-15 16:54:57,812 INFO  [main] mapreduce.Job:  map 83% reduce 0%
2018-10-15 16:55:05,853 INFO  [main] mapreduce.Job:  map 100% reduce 0%
2018-10-15 16:55:13,891 INFO  [main] mapreduce.Job:  map 100% reduce 69%
2018-10-15 16:55:16,905 INFO  [main] mapreduce.Job:  map 100% reduce 73%
2018-10-15 16:55:19,920 INFO  [main] mapreduce.Job:  map 100% reduce 76%
2018-10-15 16:55:22,935 INFO  [main] mapreduce.Job:  map 100% reduce 80%
2018-10-15 16:55:25,949 INFO  [main] mapreduce.Job:  map 100% reduce 83%
2018-10-15 16:55:28,964 INFO  [main] mapreduce.Job:  map 100% reduce 86%
2018-10-15 16:55:31,976 INFO  [main] mapreduce.Job:  map 100% reduce 90%
2018-10-15 16:55:34,989 INFO  [main] mapreduce.Job:  map 100% reduce 93%
2018-10-15 16:55:38,003 INFO  [main] mapreduce.Job:  map 100% reduce 97%
2018-10-15 16:55:41,014 INFO  [main] mapreduce.Job:  map 100% reduce 100%
2018-10-15 16:55:41,022 INFO  [main] mapreduce.Job: Job job_1539592382677_0007 completed successfully
2018-10-15 16:55:41,149 INFO  [main] mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=437400267
		FILE: Number of bytes written=875354426
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=133551726
		HDFS: Number of bytes written=188589865
		HDFS: Number of read operations=11
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=3
	Job Counters 
		Launched map tasks=2
		Launched reduce tasks=1
		Data-local map tasks=2
		Total time spent by all maps in occupied slots (ms)=302200
		Total time spent by all reduces in occupied slots (ms)=281168
		Total time spent by all map tasks (ms)=37775
		Total time spent by all reduce tasks (ms)=35146
		Total vcore-milliseconds taken by all map tasks=37775
		Total vcore-milliseconds taken by all reduce tasks=35146
		Total megabyte-milliseconds taken by all map tasks=154726400
		Total megabyte-milliseconds taken by all reduce tasks=143958016
	Map-Reduce Framework
		Map input records=2000002
		Map output records=2000002
		Map output bytes=431420770
		Map output materialized bytes=437400273
		Input split bytes=182
		Combine input records=2000002
		Combine output records=1999629
		Reduce input groups=1999281
		Reduce shuffle bytes=437400273
		Reduce input records=1999629
		Reduce output records=11995686
		Spilled Records=3999258
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=4894
		CPU time spent (ms)=206900
		Physical memory (bytes) snapshot=6744469504
		Virtual memory (bytes) snapshot=16308625408
		Total committed heap usage (bytes)=7036469248
	ImportTsv
		Bad Lines=0
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=133551544
	File Output Format Counters 
		Bytes Written=188589865

接下来要将生成的临时HFile文件导入到HBase中去

指定刚刚的HFile目录地址:/louisvv/hfile_tmp

指定需要导入的表:user

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /louisvv/hfile_tmp user

这里贴出部分日志:

整个过程还是很快的,基本在2-3秒就完成了

2018-10-15 17:01:52,416 INFO  [main-SendThread(ai-main:2181)] zookeeper.ClientCnxn: Session establishment complete on server ai-main/192.168.1.22:2181, sessionid = 0x166666be050001d, negotiated timeout = 60000
2018-10-15 17:01:52,781 WARN  [main] mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://cluster/louisvv/hfile_tmp/_SUCCESS
2018-10-15 17:01:53,249 INFO  [LoadIncrementalHFiles-0] hfile.CacheConfig: CacheConfig:disabled
2018-10-15 17:01:53,300 INFO  [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy]
2018-10-15 17:01:53,331 INFO  [LoadIncrementalHFiles-0] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/louisvv/hfile_tmp/info/1c6888ef7b014b90908e48c47597ca0a first=13010007758 last=18999999393
2018-10-15 17:01:53,742 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
2018-10-15 17:01:53,743 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x166666be050001d
2018-10-15 17:01:53,744 INFO  [main] zookeeper.ZooKeeper: Session: 0x166666be050001d closed
2018-10-15 17:01:53,745 INFO  [main-EventThread] zookeeper.ClientCnxn: EventThread shut down

验证:

使用hbase shell对数据进行验证,由于数据过多,不能进行scan操作,随机查询一个用户即可

这有个用户,row-key为68398017181,我们进行一下get


查询结果如下:

这里看到,中文在HBase中是以十六进制保存的,我们需要验证一下name字段是否和我们查询的相符即可

hbase(main):003:0> get 'user','68398017181'
COLUMN                                          CELL                                                                                                                                       
 info:address                                   timestamp=1539756695119, value=\xE6\x89\xB6\xE9\xA3\x8E\xE5\x8E\xBF                                                                        
 info:age                                       timestamp=1539756695119, value=36                                                                                                          
 info:birthday                                  timestamp=1539756695119, value=198xxxxx                                                                                                   
 info:gender                                    timestamp=1539756695119, value=\xE5\xA5\xB3                                                                                                
 info:idNumber                                  timestamp=1539756695119, value=610XXXXXXXXXXX5584                                                                                          
 info:name                                      timestamp=1539756695119, value=\xE5\xBA\x9E\xE7\xBE\x8E\xE8\xAF\x97                                                                        
6 row(s) in 0.0530 seconds

我使用python程序进行转码:

结果正确,则数据导入正确

>>> print '\xE5\xBA\x9E\xE7\xBE\x8E\xE8\xAF\x97'.decode('utf-8')
庞美诗

至此,HBase ImportTsv介绍完毕,我们下篇文章见~

亲,看完了点个赞呗!!!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

8 Responses

  1. coder说道:

    可以可以

  2. 匿名说道:

    不错啊

  3. 匿名说道:

    这个做的好啊

  4. 匿名说道:

    有空值时怎么处理的?

    • 赫墨拉说道:

      您好,上面文章有写,通过设置参数,设置在导入过程中对badline的数据如何进行处理
      -Dimporttsv.skip.bad.lines=true / false 在导入过程中,如果有不符合分割标准的行,被称之为badlines,设置是否跳过,如果不跳过,则Map-Reduce任务停止
      如果指定的列为row-key,row-key为空,如果设置了上面参数为True,导入时会跳过该行数据的导入
      如果其他列为空,则导入时与其对应的数据相同,也为空,理论上Hbase的某列数据可以为空

  5. 匿名说道:

    你好,我想问一下,为什么我执行完ImportTsv后是只读不写的呢?
    File Input Format Counters
    Bytes Read=10195514611
    File Output Format Counters
    Bytes Written=0

  6. 匿名说道:

    有遇到must use subscript when assigning associative array这个错误吗

赫墨拉进行回复 取消回复

邮箱地址不会被公开。