使用ImportTSV向HBase中导入数据
概述:
通常,在我们将数据导入HBase时,若是小批量的数据,使用HBase提供的API就可以满足需求,但是要灌入大量数据的时候,使用API的方式导入,会占用大量Regionserver的资源,影响该Regionserver上其他表的查询
为了解决这种问题,HBase官方提供了两种基于MapReduce的大量数据导入的方法:
1.ImportTSV
2.BuckLoad
数据写入流程:
在介绍今天的ImportTSV之前,先来看一下普通方式下,HBase对数据写入/删除的处理步骤,写入/删除的过程如下图:
1.客户端向HRegionServer发起请求,要求写入数据
2.HRegionServer先将数据写入HLog中,即图中WAL过程
3.HRegionServer将数据写入Memstore中,做缓存
4.当内存中的数据达到阈值时,将数据Flush(刷写)到硬盘中,即写入到HFile文件中,并同时删除内存和HLog中的历史数据
HBase在HDFS的存储结构:
我们知道,HBase的数据最终是以HFile的形式存储在HDFS中的,在HDFS中HBase的目录结构如下
/HBase指定的存储目录/HBase表/表分布的Region/列族/HFile
举个例子:
现有一张表user,有两个列族info和desc,user的region分布情况如下:
在HDFS中user表存储路径如下所示,无视两个.开头的文件,其他的目录名称均为上图中,Region对应的code码
点进去一个Region目录,目录内容如下图:
其中info和desc这两个目录,就是HBase创建表时定义的列族,HFile存放在该目录下
我们去看一下info列族下的HFile
说了这么多,现在来了解一下ImportTsv
初识ImportTSV:
ImportTsv是HBase提供的一个命令行工具,将存储在HDFS上的数据文件,通过指定的分隔符解析后,导入到HBase表中
这样的方式导入数据与正常写入流程不同的是,跳过了WAL、Memcache与Flush的过程,直接将HFile文件移动到HBase表空间目录下即可,不影响HRegionServer的性能
ImportTsv包含两种方式将数据导入到HBase表中:
第一种:使用TableOutputformat在reduce端插入数据,但是这种方式导入大批量数据的时候有可能会存在问题,尤其是列比较多的宽表导入的时候,会出现RegionTooBusyException,导致数据丢失,因此建议在数据量不是特别大并且列不是特别多的情况下使用
第二种(推荐):先使用MarpReduce程序生成HFile文件,再执行LoadIncrementalHFiles命令,将文件移动到HBase表中对应的存储目录下
ImportTsv用法:
命令如下
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
默认情况下,是第一种导入方式,即将数据在reduce端直接导入到HBase中
参数说明:
-
-Dimporttsv.columns=a,b,c 指定导入到HBase表中的列
- <tablename> 表名
- <inputdir> 数据文件在HDFS上的目录地址(直接写目录地址即可,例: /data/data.txt )
上述这些参数,只是默认直接导入所需基本的参数
如果数据文件不是使用默认的分隔符 \t 进行分割,需要指定文件分隔符:
- -Dimporttsv.separator=”,” 指定分隔符为逗号
如果使用第二种方法,要生成HFile文件,添加如下参数:
- -Dimporttsv.bulk.output=/path/for/output 指定生成的HFile文件在HDFS的存储目录地址
除此之外,还有一些运行参数可供选择:
- -Dimporttsv.skip.bad.lines=true / false 在导入过程中,如果有不符合分割标准的行,被称之为badlines,设置是否跳过,如果不跳过,则Map-Reduce任务停止
- -Dimporttsv.timestamp=currentTimeAsLong 指定导入的时间戳
- -Dimporttsv.mapper.class=my.Mapper 指定用户自定义的Mapper程序,用于替换默认的Mapper:org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
- -Dmapreduce.job.name=jobName 用户指定MapReduce任务名称
- -Dcreate.table= yes/no 如果HBase中没有创建表,是否使用ImportTSV工具创建该表,如果设置为no,则在HBase中表必须存在
- -Dno.strict= true / false 是否忽略HBASE表中的列族检查,默认为false
如果考虑提升导入性能,可以参考以下参数:
- -Dmapreduce.map.speculative=false 关闭Map端推断
- -Dmapreduce.reduce.speculative=false 关闭Reduce端推断
实例:
现有一个数据文件user.txt,文件内容如下:
注:图片所示数据均为假数据,如有雷同,纯属巧合!
对应的列分别为:手机号反转,姓名,性别,年龄,身份证号,户籍地址,生日
现要将数据文件导入到user表中,user表建表语句如下:
info列族为用户基本信息
desc为用户其他描述信息
create 'user', {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', VERSIONS => '1', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false',REPLICATION_SCOPE=>'1'}, {NAME => 'desc', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', VERSIONS => '1', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536',IN_MEMORY => 'false', BLOCKCACHE => 'false',REPLICATION_SCOPE=>'1'}, SPLITS => ['0','1', '2', '3', '4','5','6','7','8','9']
需求:
需要将user.txt文件中数据导入到user表中的info列族,row-key为用户手机号反转
第一种方式导入(简单介绍):
指定分隔符为逗号
指定导入的列:第一列为HBASE_ROW_KEY,其他列都是info列族下的name,gender,age,idNumber,address,birthday列
指定需要导入的表名:user
指定数据文件在HDFS上的目录地址:/louisvv/user.txt
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,info:idNumber,info:address,info:birthday user /louisvv/user.txt
第二种方式导入(重点介绍):
指定分隔符为逗号
指定产生的HFile在HDFS上存放的临时目录:/louisvv/hfile_tmp
指定导入的列:第一列为HBASE_ROW_KEY,其他列都是info列族下的name,gender,age,idNumber,address,birthday列
指定需要导入的表名:user
指定数据文件在HDFS上的目录地址:/louisvv/user.txt
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=/louisvv/hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,info:idNumber,info:address,info:birthday user /yw/user.txt
截取Map-Reduce部分日志如下:
可以看到MR的基本信息,共2000002(二百万)条数据,ImportTsv 处理的Bad Lines=0,即所有数据都成功的导入了,整个过程大约花费了1分20秒左右的时间,可以看出ImportTsv的效率还是挺快的
2018-10-15 16:54:24,570 INFO [main] mapreduce.Job: map 0% reduce 0% 2018-10-15 16:54:35,670 INFO [main] mapreduce.Job: map 20% reduce 0% 2018-10-15 16:54:38,688 INFO [main] mapreduce.Job: map 33% reduce 0% 2018-10-15 16:54:45,730 INFO [main] mapreduce.Job: map 50% reduce 0% 2018-10-15 16:54:54,795 INFO [main] mapreduce.Job: map 70% reduce 0% 2018-10-15 16:54:57,812 INFO [main] mapreduce.Job: map 83% reduce 0% 2018-10-15 16:55:05,853 INFO [main] mapreduce.Job: map 100% reduce 0% 2018-10-15 16:55:13,891 INFO [main] mapreduce.Job: map 100% reduce 69% 2018-10-15 16:55:16,905 INFO [main] mapreduce.Job: map 100% reduce 73% 2018-10-15 16:55:19,920 INFO [main] mapreduce.Job: map 100% reduce 76% 2018-10-15 16:55:22,935 INFO [main] mapreduce.Job: map 100% reduce 80% 2018-10-15 16:55:25,949 INFO [main] mapreduce.Job: map 100% reduce 83% 2018-10-15 16:55:28,964 INFO [main] mapreduce.Job: map 100% reduce 86% 2018-10-15 16:55:31,976 INFO [main] mapreduce.Job: map 100% reduce 90% 2018-10-15 16:55:34,989 INFO [main] mapreduce.Job: map 100% reduce 93% 2018-10-15 16:55:38,003 INFO [main] mapreduce.Job: map 100% reduce 97% 2018-10-15 16:55:41,014 INFO [main] mapreduce.Job: map 100% reduce 100% 2018-10-15 16:55:41,022 INFO [main] mapreduce.Job: Job job_1539592382677_0007 completed successfully 2018-10-15 16:55:41,149 INFO [main] mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=437400267 FILE: Number of bytes written=875354426 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=133551726 HDFS: Number of bytes written=188589865 HDFS: Number of read operations=11 HDFS: Number of large read operations=0 HDFS: Number of write operations=3 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=302200 Total time spent by all reduces in occupied slots (ms)=281168 Total time spent by all map tasks (ms)=37775 Total time spent by all reduce tasks (ms)=35146 Total vcore-milliseconds taken by all map tasks=37775 Total vcore-milliseconds taken by all reduce tasks=35146 Total megabyte-milliseconds taken by all map tasks=154726400 Total megabyte-milliseconds taken by all reduce tasks=143958016 Map-Reduce Framework Map input records=2000002 Map output records=2000002 Map output bytes=431420770 Map output materialized bytes=437400273 Input split bytes=182 Combine input records=2000002 Combine output records=1999629 Reduce input groups=1999281 Reduce shuffle bytes=437400273 Reduce input records=1999629 Reduce output records=11995686 Spilled Records=3999258 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=4894 CPU time spent (ms)=206900 Physical memory (bytes) snapshot=6744469504 Virtual memory (bytes) snapshot=16308625408 Total committed heap usage (bytes)=7036469248 ImportTsv Bad Lines=0 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=133551544 File Output Format Counters Bytes Written=188589865
接下来要将生成的临时HFile文件导入到HBase中去
指定刚刚的HFile目录地址:/louisvv/hfile_tmp
指定需要导入的表:user
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /louisvv/hfile_tmp user
这里贴出部分日志:
整个过程还是很快的,基本在2-3秒就完成了
2018-10-15 17:01:52,416 INFO [main-SendThread(ai-main:2181)] zookeeper.ClientCnxn: Session establishment complete on server ai-main/192.168.1.22:2181, sessionid = 0x166666be050001d, negotiated timeout = 60000 2018-10-15 17:01:52,781 WARN [main] mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://cluster/louisvv/hfile_tmp/_SUCCESS 2018-10-15 17:01:53,249 INFO [LoadIncrementalHFiles-0] hfile.CacheConfig: CacheConfig:disabled 2018-10-15 17:01:53,300 INFO [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy] 2018-10-15 17:01:53,331 INFO [LoadIncrementalHFiles-0] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/louisvv/hfile_tmp/info/1c6888ef7b014b90908e48c47597ca0a first=13010007758 last=18999999393 2018-10-15 17:01:53,742 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 2018-10-15 17:01:53,743 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x166666be050001d 2018-10-15 17:01:53,744 INFO [main] zookeeper.ZooKeeper: Session: 0x166666be050001d closed 2018-10-15 17:01:53,745 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
验证:
使用hbase shell对数据进行验证,由于数据过多,不能进行scan操作,随机查询一个用户即可
这有个用户,row-key为68398017181,我们进行一下get
查询结果如下:
这里看到,中文在HBase中是以十六进制保存的,我们需要验证一下name字段是否和我们查询的相符即可
hbase(main):003:0> get 'user','68398017181' COLUMN CELL info:address timestamp=1539756695119, value=\xE6\x89\xB6\xE9\xA3\x8E\xE5\x8E\xBF info:age timestamp=1539756695119, value=36 info:birthday timestamp=1539756695119, value=198xxxxx info:gender timestamp=1539756695119, value=\xE5\xA5\xB3 info:idNumber timestamp=1539756695119, value=610XXXXXXXXXXX5584 info:name timestamp=1539756695119, value=\xE5\xBA\x9E\xE7\xBE\x8E\xE8\xAF\x97 6 row(s) in 0.0530 seconds
我使用python程序进行转码:
结果正确,则数据导入正确
>>> print '\xE5\xBA\x9E\xE7\xBE\x8E\xE8\xAF\x97'.decode('utf-8') 庞美诗
至此,HBase ImportTsv介绍完毕,我们下篇文章见~
亲,看完了点个赞呗!!!
可以可以
不错啊
这个做的好啊
多谢多谢
有空值时怎么处理的?
您好,上面文章有写,通过设置参数,设置在导入过程中对badline的数据如何进行处理
-Dimporttsv.skip.bad.lines=true / false 在导入过程中,如果有不符合分割标准的行,被称之为badlines,设置是否跳过,如果不跳过,则Map-Reduce任务停止
如果指定的列为row-key,row-key为空,如果设置了上面参数为True,导入时会跳过该行数据的导入
如果其他列为空,则导入时与其对应的数据相同,也为空,理论上Hbase的某列数据可以为空
你好,我想问一下,为什么我执行完ImportTsv后是只读不写的呢?
File Input Format Counters
Bytes Read=10195514611
File Output Format Counters
Bytes Written=0
有遇到must use subscript when assigning associative array这个错误吗