文章目录
hbase的API提取的工具类namespace 的DDLtable的DDLtable的CRUD过滤器单列过滤器SingleColumnValueFilter
结构过滤器FilterList
KeyValue元数据过滤器1、FamilyFilter2、QualifierFilter3、ColumnPrefixFilter4、MultipleColumnPrefixFilter5、ColumnRangeFilter
RowKey过滤器RowFilter
Utility过滤器FirstKeyOnlyFilter
分页过滤器PageFilter
比较器列值比较器1、RegexStringComparator2、SubstringComparator3、BinaryPrefixComparator4、BinaryComparator
过滤器加比较器的整体代码hbase的存储机制hbase的寻址流程hbase的读写流程布隆过滤器homeworkhbase和hive的整合整合的原因Hive-To-HbaseHbase-To-Hive
hbase的API
提取的工具类
package com
.xx
.HbaseAPI
.Utile
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.hbase
.Cell
;
import org
.apache
.hadoop
.hbase
.CellScanner
;
import org
.apache
.hadoop
.hbase
.CellUtil
;
import org
.apache
.hadoop
.hbase
.TableName
;
import org
.apache
.hadoop
.hbase
.client
.*
;
import java
.io
.IOException
;
public class HbaseUtile {
static Connection connection
;
static{
Configuration conf
= new Configuration();
conf
.set("hbase.zookeeper.quorum", "qianfeng01:2181,qianfeng02:2181,qianfeng03:2181");
try {
connection
= ConnectionFactory
.createConnection(conf
);
} catch (IOException e
) {
e
.printStackTrace();
}
}
public static Admin
getAdmin(){
try {
return connection
.getAdmin();
} catch (IOException e
) {
e
.printStackTrace();
}
return null
;
}
public static void closeAdmin(Admin admin
){
if(admin
!=null
){
try {
admin
.close();
} catch (IOException e
) {
e
.printStackTrace();
}
}
}
public static Table
getTable(String tableName
){
Table table
= null
;
try {
table
= connection
.getTable(TableName
.valueOf(tableName
));
} catch (IOException e
) {
e
.printStackTrace();
}
return table
;
}
public static void closeTable(Table table
){
try {
table
.close();
} catch (IOException e
) {
e
.printStackTrace();
}
}
public static void printCell(Result result
) throws IOException
{
CellScanner cellScanner
= result
.cellScanner();
while(cellScanner
.advance()){
Cell current
= cellScanner
.current();
System
.out
.println(new String(CellUtil
.cloneRow(current
)) +"\t"+new String(CellUtil
.cloneFamily(current
))+"\t"+new String(CellUtil
.cloneValue(current
)));
}
}
}
namespace 的DDL
package com
.xx
.HbaseAPI
;
import com
.xx
.HbaseAPI
.Utile
.HbaseUtile
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
import org
.apache
.hadoop
.hbase
.TableName
;
import org
.apache
.hadoop
.hbase
.client
.Admin
;
import org
.apache
.hadoop
.hbase
.client
.Connection
;
import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
import org
.junit
.After
;
import org
.junit
.Before
;
import org
.junit
.Test
;
import java
.io
.IOException
;
import java
.util
.Map
;
import java
.util
.Set
;
public class NameSpaceDDL {
static Admin admin
;
@Before
public void getAdmin() {
admin
= HbaseUtile
.getAdmin();
}
@After
public void closeAdmin() {
HbaseUtile
.closeAdmin(admin
);
}
@Test
public void CreateNamespace() throws IOException
{
NamespaceDescriptor ns1
= NamespaceDescriptor
.create("ns1").build();
admin
.createNamespace(ns1
);
}
@Test
public void listNamespace() throws IOException
{
NamespaceDescriptor
[] namespaceDescriptors
= admin
.listNamespaceDescriptors();
for (NamespaceDescriptor namespaceDescriptor
: namespaceDescriptors
) {
System
.out
.println(namespaceDescriptor
.getName());
}
}
@Test
public void listNamespaceTables() throws IOException
{
TableName
[] ns1s
= admin
.listTableNamesByNamespace("ns1");
for (TableName ns1
: ns1s
) {
System
.out
.println(ns1
.getNameAsString());
}
}
@Test
public void describeNamespace() throws IOException
{
NamespaceDescriptor ns1
= admin
.getNamespaceDescriptor("ns1");
Map
<String, String> map
= ns1
.getConfiguration();
Set
<String> keys
= map
.keySet();
for (String key
: keys
) {
System
.out
.println(key
+"="+map
.get(key
));
}
}
@Test
public void alterNamespaceAdd() throws IOException
{
NamespaceDescriptor ns1
= admin
.getNamespaceDescriptor("ns1");
ns1
.setConfiguration("city", "shenzhen");
ns1
.setConfiguration("street", "hangcheng");
admin
.modifyNamespace(ns1
);
}
@Test
public void alterNamespaceModify() throws IOException
{
NamespaceDescriptor ns1
= admin
.getNamespaceDescriptor("ns1");
ns1
.setConfiguration("city", "datong");
ns1
.setConfiguration("street", "xxx");
admin
.modifyNamespace(ns1
);
}
@Test
public void alterNamespaceRemove() throws IOException
{
NamespaceDescriptor ns1
= admin
.getNamespaceDescriptor("ns1");
ns1
.removeConfiguration("city");
admin
.modifyNamespace(ns1
);
}
@Test
public void dropNamespace() throws IOException
{
admin
.deleteNamespace("ns1");
}
}
table的DDL
package com
.xx
.HbaseAPI
;
import com
.xx
.HbaseAPI
.Utile
.HbaseUtile
;
import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
import org
.apache
.hadoop
.hbase
.TableName
;
import org
.apache
.hadoop
.hbase
.client
.Admin
;
import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
import org
.apache
.hadoop
.hbase
.util
.Bytes
;
import org
.junit
.After
;
import org
.junit
.Before
;
import org
.junit
.Test
;
import java
.io
.IOException
;
public class TableDDL {
Admin admin
;
@Before
public void getAdmin(){
admin
=HbaseUtile
.getAdmin();
}
@After
public void closeAdmin(){
HbaseUtile
.closeAdmin(admin
);
}
@Test
public void createTable() throws IOException
{
HTableDescriptor xiao
= new HTableDescriptor(TableName
.valueOf("ns1:xiao"));
HColumnDescriptor f1
= new HColumnDescriptor("f1");
f1
.setBloomFilterType(BloomType
.ROWCOL
);
f1
.setMaxVersions(5);
f1
.setTimeToLive(3600*24);
xiao
.addFamily(f1
);
admin
.createTable(xiao
);
}
@Test
public void deacribeTable() throws IOException
{
HTableDescriptor ns1
= admin
.getTableDescriptor(TableName
.valueOf("ns1:xiao"));
HColumnDescriptor
[] columnFamilies
= ns1
.getColumnFamilies();
for (HColumnDescriptor columnFamily
: columnFamilies
) {
System
.out
.println(columnFamily
.getMaxVersions());
System
.out
.println(columnFamily
.getTimeToLive());
System
.out
.println(columnFamily
.getNameAsString());
System
.out
.println(columnFamily
.isBlockCacheEnabled());
}
}
@Test
public void alterTableAppendColumnFamily() throws IOException
{
HTableDescriptor tableDescriptor
= admin
.getTableDescriptor(TableName
.valueOf("ns1:xiao"));
HColumnDescriptor f2
= new HColumnDescriptor("f2");
HColumnDescriptor f3
= new HColumnDescriptor("f3");
HColumnDescriptor f4
= new HColumnDescriptor("f4");
f2
.setTimeToLive(3600*24*7);
f2
.setMaxVersions(5);
f2
.setBloomFilterType(BloomType
.ROWCOL
);
admin
.addColumn(TableName
.valueOf("ns1:xiao"),f2
);
admin
.addColumn(TableName
.valueOf("ns1:xiao"),f3
);
admin
.addColumn(TableName
.valueOf("ns1:xiao"),f4
);
}
@Test
public void alterTableModifyColumnFamily() throws IOException
{
HColumnDescriptor f2
= new HColumnDescriptor("f2");
f2
.setMaxVersions(3);
admin
.modifyColumn(TableName
.valueOf("ns1:xiao"),f2
);
}
@Test
public void alterTableDeleteColumnFamily() throws IOException
{
admin
.deleteColumn(TableName
.valueOf("ns1:xiao"), Bytes
.toBytes("f2"));
}
@Test
public void listAllTable() throws IOException
{
TableName
[] tableNames
= admin
.listTableNames();
for (TableName tableName
: tableNames
) {
System
.out
.println(tableName
.getNameAsString());
}
}
@Test
public void dropTable() throws IOException
{
TableName tableName
= TableName
.valueOf("ns1:xiao");
if(admin
.tableExists(tableName
)){
admin
.disableTable(tableName
);
admin
.deleteTable(tableName
);
}
}
}
table的CRUD
package com
.xx
.HbaseAPI
;
import com
.xx
.HbaseAPI
.Utile
.HbaseUtile
;
import org
.apache
.hadoop
.hbase
.Cell
;
import org
.apache
.hadoop
.hbase
.CellScanner
;
import org
.apache
.hadoop
.hbase
.CellUtil
;
import org
.apache
.hadoop
.hbase
.client
.*
;
import org
.apache
.hadoop
.hbase
.util
.Bytes
;
import org
.junit
.After
;
import org
.junit
.Before
;
import org
.junit
.Test
;
import java
.io
.IOException
;
import java
.util
.ArrayList
;
import java
.util
.Iterator
;
import java
.util
.List
;
public class TableCRUD {
Table table
;
@Before
public void getTable(){
table
= HbaseUtile
.getTable("ns1:xiao");
}
@After
public void closeTable(){
HbaseUtile
.closeTable(table
);
}
@Test
public void putData() throws IOException
{
Put put
= new Put(Bytes
.toBytes("rk000001"));
put
.addColumn(Bytes
.toBytes("f1"), ("name").getBytes(), ("zhang").getBytes());
table
.put(put
);
}
@Test
public void putBatchData() throws IOException
{
List
<Put> puts
=new ArrayList<>();
for (int i
= 0; i
< 1000; i
++) {
String rowkey
="";
if(i
<10){
rowkey
="rk00000"+i
;
}else if(i
<100){
rowkey
="rk0000"+i
;
}else{
rowkey
="rk000"+i
;
}
Put put
= new Put(rowkey
.getBytes());
put
.addColumn("f1".getBytes(), "name".getBytes(),("xin"+i
).getBytes() );
put
.addColumn("f1".getBytes(), "age".getBytes(), ((int)(Math
.random()*100)+"").getBytes());
String sex
;
if(Math
.random()*2==1){
sex
="f";
}else
sex
="m";
put
.addColumn("f1".getBytes(), "sex".getBytes(), sex
.getBytes());
put
.addColumn("f1".getBytes(), "sal".getBytes(), ((int)(Math
.random()*9000+1000)+"").getBytes());
puts
.add(put
);
table
.put(puts
);
}
}
@Test
public void getSingleRowData() throws IOException
{
Get get
= new Get(Bytes
.toBytes("rk000001"));
Result result
= table
.get(get
);
CellScanner cellScanner
= result
.cellScanner();
while(cellScanner
.advance()){
Cell current
= cellScanner
.current();
System
.out
.println(new String(CellUtil
.cloneRow(current
)) +"\t"+new String(CellUtil
.cloneFamily(current
))+"\t"+new String(CellUtil
.cloneValue(current
)));
}
}
@Test
public void getMultipleData() throws IOException
{
List
<Get> gets
=new ArrayList<>();
gets
.add(new Get("rk000004".getBytes()));
gets
.add(new Get("rk000019".getBytes()));
gets
.add(new Get("rk000003".getBytes()));
gets
.add(new Get("rk000007".getBytes()));
Result
[] results
= table
.get(gets
);
for (Result result
: results
) {
HbaseUtile
.printCell(result
);
}
}
@Test
public void scanData() throws IOException
{
Scan scan
= new Scan("rk000001".getBytes(), ("rk000010\001").getBytes());
ResultScanner scanner
= table
.getScanner(scan
);
Iterator
<Result> iterator
= scanner
.iterator();
while(iterator
.hasNext()){
Result next
= iterator
.next();
HbaseUtile
.printCell(next
);
}
}
@Test
public void deleteCell() throws IOException
{
Delete delete
= new Delete("rk000001".getBytes());
delete
.addColumn("f1".getBytes(), "name".getBytes());
table
.delete(delete
);
}
@Test
public void deleteRow() throws IOException
{
Delete delete
= new Delete("rk000001".getBytes());
table
.delete(delete
);
}
}
过滤器
为什么使用过滤器:没有过滤器的话,hbase只能查询整张表或者行范围的查询,不能使用其他条件进行筛选,过滤。
单列过滤器
SingleColumnValueFilter
结构过滤器
FilterList
KeyValue元数据过滤器
1、FamilyFilter
2、QualifierFilter
3、ColumnPrefixFilter
4、MultipleColumnPrefixFilter
5、ColumnRangeFilter
RowKey过滤器
RowFilter
Utility过滤器
FirstKeyOnlyFilter
分页过滤器
PageFilter
比较器
列值比较器
1、RegexStringComparator
2、SubstringComparator
3、BinaryPrefixComparator
4、BinaryComparator
过滤器加比较器的整体代码
package com
.qf
.hbase
.api
;
import com
.qf
.hbase
.util
.HbaseUtil
;
import org
.apache
.hadoop
.hbase
.client
.Result
;
import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
import org
.apache
.hadoop
.hbase
.client
.Scan
;
import org
.apache
.hadoop
.hbase
.client
.Table
;
import org
.apache
.hadoop
.hbase
.filter
.*
;
import org
.apache
.hadoop
.hbase
.util
.Bytes
;
import org
.junit
.After
;
import org
.junit
.Before
;
import org
.junit
.Test
;
import java
.io
.IOException
;
import java
.util
.Iterator
;
public class FilterDemo {
Table table
= null
;
@Before
public void getTable(){
table
= HbaseUtil
.getTable("ns1:emp");
}
@After
public void closeTable(){
HbaseUtil
.closeTable(table
);
}
@Test
public void testSingleColumnValueFilter() throws IOException
{
SingleColumnValueFilter filter
= new SingleColumnValueFilter(
"f1".getBytes(),"ename".getBytes(), CompareFilter
.CompareOp
.EQUAL
,"zhaoyun100".getBytes());
filter
.setFilterIfMissing(true);
Scan scan
= new Scan();
scan
.setFilter(filter
);
executeScan(scan
);
}
@Test
public void testFilterList() throws IOException
{
SingleColumnValueFilter enameFilter
= new SingleColumnValueFilter(
"f1".getBytes(),"ename".getBytes(), CompareFilter
.CompareOp
.EQUAL
, Bytes
.toBytes("zhaoyun10"));
SingleColumnValueFilter ageFilter
= new SingleColumnValueFilter(
"f1".getBytes(),"age".getBytes(), CompareFilter
.CompareOp
.EQUAL
, Bytes
.toBytes("23"));
enameFilter
.setFilterIfMissing(true);
ageFilter
.setFilterIfMissing(true);
FilterList filterList
= new FilterList(FilterList
.Operator
.MUST_PASS_ONE
);
filterList
.addFilter(enameFilter
);
filterList
.addFilter(ageFilter
);
Scan scan
= new Scan();
scan
.setFilter(filterList
);
executeScan(scan
);
}
@Test
public void testComparator() throws IOException
{
BinaryComparator comparator
= new BinaryComparator("zhaoyun10".getBytes());
SingleColumnValueFilter enameFilter
= new SingleColumnValueFilter(
"f1".getBytes(),"ename".getBytes(), CompareFilter
.CompareOp
.EQUAL
,comparator
);
enameFilter
.setFilterIfMissing(true);
Scan scan
= new Scan();
scan
.setFilter(enameFilter
);
executeScan(scan
);
}
@Test
public void testFamilyFilter() throws IOException
{
RegexStringComparator comparator
= new RegexStringComparator("f1");
FamilyFilter familyFilter
= new FamilyFilter(CompareFilter
.CompareOp
.EQUAL
,comparator
);
Scan scan
= new Scan();
scan
.setFilter(familyFilter
);
executeScan(scan
);
}
@Test
public void testQualifierFilter() throws IOException
{
SubstringComparator comparator
= new SubstringComparator("e");
QualifierFilter filter
= new QualifierFilter(CompareFilter
.CompareOp
.EQUAL
,comparator
);
Scan scan
= new Scan();
scan
.setFilter(filter
);
executeScan(scan
);
}
@Test
public void testColumnPrefixFilter() throws IOException
{
ColumnPrefixFilter filter
= new ColumnPrefixFilter("p".getBytes());
Scan scan
= new Scan();
scan
.setFilter(filter
);
executeScan(scan
);
}
@Test
public void testMultipleColumnPrefixFilter() throws IOException
{
byte[][] bytes
= new byte[][]{"p".getBytes(),"a".getBytes()};
MultipleColumnPrefixFilter filter
= new MultipleColumnPrefixFilter(bytes
);
Scan scan
= new Scan();
scan
.setFilter(filter
);
executeScan(scan
);
}
@Test
public void testColumnRangeFilter() throws IOException
{
ColumnRangeFilter rangeFilter
= new ColumnRangeFilter("ename".getBytes(),true,"province".getBytes(),true);
Scan scan
= new Scan();
scan
.setFilter(rangeFilter
);
executeScan(scan
);
}
@Test
public void testRowFilter() throws IOException
{
RegexStringComparator comparator
= new RegexStringComparator("0010");
RowFilter filter
= new RowFilter(CompareFilter
.CompareOp
.EQUAL
,comparator
);
Scan scan
= new Scan();
scan
.setFilter(filter
);
executeScan(scan
);
}
@Test
public void testFirstkeyOnlyFilter() throws IOException
{
FirstKeyOnlyFilter firstKeyOnlyFilter
= new FirstKeyOnlyFilter();
Scan scan
= new Scan();
scan
.setFilter(firstKeyOnlyFilter
);
executeScan(scan
);
}
@Test
public void testPageFilter() throws IOException
{
PageFilter pageFilter
= new PageFilter(20);
Scan scan
= new Scan();
scan
.setFilter(pageFilter
);
while(true){
System
.out
.println("-------------------------");
ResultScanner scanner
= table
.getScanner(scan
);
Iterator
<Result> iterator
= scanner
.iterator();
byte[] maxRow
=new byte[]{};
int count
= 0;
while(iterator
.hasNext()){
Result result
= iterator
.next();
HbaseUtil
.printCell(result
);
maxRow
= result
.getRow();
count
++;
}
if(count
<20){
break;
}
scan
.setStartRow((new String(maxRow
)+"\001").getBytes());
}
}
public void executeScan(Scan scan
) throws IOException
{
ResultScanner scanner
= table
.getScanner(scan
);
Iterator
<Result> iterator
= scanner
.iterator();
while (iterator
.hasNext()){
Result result
= iterator
.next();
HbaseUtil
.printCell(result
);
}
}
}
hbase的存储机制
hbase的一张表是一个region或者多个region,而一个region就是hdfs上的一个目录,hbase是面向列式存储的,所以一个region中可以有多个列族,一个列族也是hdfs上的一个目录,而列族(store)中通过memstore会产生多个文件(1小时、满128M、所有memstore占用regionserver整体内存的40%),而产生的storeFile将以HFile的格式存储再hdfs上,也就是列族对应的目录下,当storeFile的个数超过阈值时,会进行合并,再合并时会进行删除,修改,排序等操作。 在hbase中每个storeFile中的数据是有序的,但每个storeFile之间不一定是有序的,所以合并之后会再次进行排序。 store在内存中只需要维护hdfs上的原始数据的索引就可以了
region是hbase中的表或者是表的一部分,region有自己的rowkey范围。也就是基本的存储结构(存储模型,是内存中的java对象)region管理的数据有两部分,
一部分在内存中,内存中的数据是先按照rowkey字典排序,然后再key字典排序,之后再时间戳降序排序另一部分是以文件(storefile)的形式存储在hdfs上。文件中的数据从memstore中flush而来,因此也有序,不过文件之间不一定有序 region所要管理的数据可能由于单元格过多,或者是意义不同,分为列族进行管理,一个列族对应一个store(也是内存中的对象)一个store就管理着相同意义的单元格,这些单元格在对应的memstore中存储,但是由于阈值,最终会flush成storefile。storefile的个数越来越多,store(对象)管理着这些文件的索引信息。storefile的个数太多的话,也不好管理,因此会合并机制,合并成一个文件如果一个storefile过大,region和regionserver的负载不均衡,因此会有切分机制,细节上是切分文件(从某一个rowkey开始切分,文件大小尽可能均分,也可能造成其他的文件的切分),宏观上是region的切分。
hbase的寻址流程
1、client先访问zookeeper获取到meta表的存储位置 2、得到meta表的存储位置后,client访问该reginserver,并且读取meta表,查找要访问的region再哪个regionserver上 3、得到region得地址后,client访问该region进行读写操作即可
hbase的读写流程
读流程: 1、client通过寻址流程找到对应得region中的store 2、store维护了两个内存memstore用于写数据和BlockCache用于读数据 3、当读数据时,会先到BlockCache中查数据,之后到memstore中查,最后到hdfs上查,查到后会将查找出来的数据放入BlockCache中,当下次读取该数据时,速度会快很多。 写流程: 1、client通过寻址流程找到对应得region中的store 2、向对应的region中的对应store的memstore中写数据(写入memstore之前会先写到Hlog中) 3、当memstore达到条件:1小时、满128M、所有memstore占用 regionserver整体内存的40%时会flush为一个storeFile文件 4、当storeFile达到一定数量时会进行合并
布隆过滤器
布隆过滤器内部维护了一个64KB大小的字节数组,和若干格haseCode()方法,当存储元素时,会调用若干个haseCode()方法计算出int值,用来当作index,查看对应的index上的值是否为0,只要有一个是0,则说明该元素没有被存储,然后在存储后,将数组对应的index上填入1。 当查询元素时,也会调用若干个haseCode()方法计算出int值,然后查看数组中对应的index上的值,如果所有的index上的值都为1,则这个元素可能存在,如果有一个为0,则说明此元素一定不存在集合中
homework
数据:
select * from t_user where uname like ‘%l%’ and sal > 2000
使用hbase JAVA API完成以上sql的查询效果
package com
.xx
.HbaseAPI
;
import com
.xx
.HbaseAPI
.Utile
.HbaseUtile
;
import org
.apache
.hadoop
.hbase
.Cell
;
import org
.apache
.hadoop
.hbase
.CellScanner
;
import org
.apache
.hadoop
.hbase
.CellUtil
;
import org
.apache
.hadoop
.hbase
.client
.Result
;
import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
import org
.apache
.hadoop
.hbase
.client
.Scan
;
import org
.apache
.hadoop
.hbase
.client
.Table
;
import org
.apache
.hadoop
.hbase
.filter
.*
;
import org
.junit
.After
;
import org
.junit
.Before
;
import org
.junit
.Test
;
import java
.io
.IOException
;
import java
.util
.Iterator
;
public class filterHomeWork {
Table table
;
@Before
public void getTable() {
table
= HbaseUtile
.getTable("default:xiao");
}
@After
public void closeTable() {
HbaseUtile
.closeTable(table
);
}
@Test
public void homeWork() throws IOException
{
SingleColumnValueFilter filter1
= new SingleColumnValueFilter("f1".getBytes(), "sal".getBytes(), CompareFilter
.CompareOp
.GREATER
, "2000".getBytes());
RegexStringComparator regex
= new RegexStringComparator("l");
SingleColumnValueFilter filter2
= new SingleColumnValueFilter("f1".getBytes(), "uname".getBytes(), CompareFilter
.CompareOp
.EQUAL
, regex
);
FilterList filterList
= new FilterList(FilterList
.Operator
.MUST_PASS_ALL
);
filter1
.setFilterIfMissing(true);
filter2
.setFilterIfMissing(true);
filterList
.addFilter(filter1
);
filterList
.addFilter(filter2
);
Scan scan
= new Scan();
scan
.setFilter(filterList
);
ResultScanner scanner
= table
.getScanner(scan
);
Iterator
<Result> iterator
= scanner
.iterator();
while (iterator
.hasNext()) {
Result next
= iterator
.next();
CellScanner cellScanner
= next
.cellScanner();
while (cellScanner
.advance()) {
Cell current
= cellScanner
.current();
System
.out
.println(new String(CellUtil
.cloneRow(current
)) + "\t" + new String(CellUtil
.cloneFamily(current
)) + "\t" + new String(CellUtil
.cloneValue(current
)));
}
}
}
}
hbase和hive的整合
整合的原因
1. HBase的最主要的目的是做数据存储
2. Hive的最主要作用是做数据分析
3. 整合的目的是为了方便的在项目中存储+分析
4. hbase中的表数据在hive中能看到,hive中的表数据在hbase中也能看到
Hive-To-Hbase
1) 首先在hive中创建hbase能看到的表
create table if not exists hive2hbase
(
uid
int,
uname string
,
age
int,
sex string
)
stored
by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties
(
"hbase.columns.mapping"=":key,base_info:name,base_info:age,base_info:sex"
)
tblproperties
(
"hbase.table.name"="hive2hbase1"
);
2) 如果hive是低版本的,如1.2.1,那么应该报以下错误
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V
原因是:依赖于hive-hbase-handler-1.2.1.jar,而找到的hbase中的版本匹配不一致造成的。
hive-2.1.1不会出现以上错误
3) 解决办法
- 可以使用maven 下载此包
<dependency>
<groupId>org.apache.hive
</groupId>
<artifactId>hive-hbase-handler
</artifactId>
<version>1.2.1
</version>
</dependency>
- 去本地仓库中找到源码包,解压
- 使用IDE,随便创建一个空项目
- 将源码包copy到src下,因为依赖hive和hbase的jar,因此有红叉
- 将hive和hbase的lib下的所有jar 放入此项目下的lib中
- 将源码重写打包生成hive-hbase-handler-1.2.1.jar,覆盖掉hive的lib下原有的jar包
- 然后再重写建表,就好了
4) 进行测试
1. 在hive中插入一条记录
insert into hive2hbase values(1, 'michael', 32, '男');
2. 在hbase中插入一条记录
put 'hive2hbase1', '2', 'base_info:age','33'
put 'hive2hbase1', '2', 'base_info:name','rock'
put 'hive2hbase1', '2', 'base_info:sex','女'
3. 分别查看查询结果
4. 导入数据
借助中间表向hive2hbase表中导入数据。
create table if not exists temp_hivehbase (
uid int,
uname string,
age int,
sex string
)
row format delimited
fields terminated by ' ';
Hbase-To-Hive
1) 确定hbase中的某一张表的表结构和表数据
hbase> create 'ns1:t1','f1'
put 'ns1:t1','rk00001','f1:name','zhaoyun'
put 'ns1:t1','rk00001','f1:age',23
put 'ns1:t1','rk00001','f1:gender','m'
put 'ns1:t1','rk00002','f1:name','zhenji'
put 'ns1:t1','rk00002','f1:age',24
put 'ns1:t1','rk00002','f1:gender','f'
put 'ns1:t1','rk00003','f1:name','貂蝉'
put 'ns1:t1','rk00003','f1:age',24
put 'ns1:t1','rk00003','f1:gender','f'
2) 根据hbase的表结构建立hive表
drop table hbase2hive
;
create external
table if not exists hbase2hive
(
uid string
,
age
int,
name string
,
sex string
)
stored
by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties
(
"hbase.columns.mapping"=":key,f1:age,f1:name,f1:gender"
)
tblproperties
(
"hbase.table.name"="ns1:t1"
);
3) 在hive中查看结果
4) 注意事项
映射hbase的列时,要么就写:key,要么不写,默认使用:keyhbase中表存在的时候,在hive中创建表时应该使用external关键字。如果删除了hbase中对应的表数据,那么hive中就不能查询出来数据。hbase中的列和hive中的列个数和数据类型应该尽量相同,hive表和hbase表的字段不是按照名字匹配,而是按照顺序来匹配的。hive、hbase和mysql等可以使用第三方工具来进行整合。