Hbase02(hbase的API,过滤器,比较器,存储机制,寻址流程,读写流程,布隆过滤器,与hive的整合)

it2025-04-05  20

文章目录

hbase的API提取的工具类namespace 的DDLtable的DDLtable的CRUD过滤器单列过滤器SingleColumnValueFilter 结构过滤器FilterList KeyValue元数据过滤器1、FamilyFilter2、QualifierFilter3、ColumnPrefixFilter4、MultipleColumnPrefixFilter5、ColumnRangeFilter RowKey过滤器RowFilter Utility过滤器FirstKeyOnlyFilter 分页过滤器PageFilter 比较器列值比较器1、RegexStringComparator2、SubstringComparator3、BinaryPrefixComparator4、BinaryComparator 过滤器加比较器的整体代码hbase的存储机制hbase的寻址流程hbase的读写流程布隆过滤器homeworkhbase和hive的整合整合的原因Hive-To-HbaseHbase-To-Hive

hbase的API

提取的工具类

package com.xx.HbaseAPI.Utile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; public class HbaseUtile { static Connection connection; static{ //获取配置文件对象,会主动加载类库中的jar包里的默认属性 Configuration conf = new Configuration(); //配置zookeeper集群的配置信息,必须使用主机名, conf.set("hbase.zookeeper.quorum", "qianfeng01:2181,qianfeng02:2181,qianfeng03:2181"); //发送请求,获取连接对象 try { connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } } /** * 获取admin对象 * @return */ public static Admin getAdmin(){ try { return connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 关闭admin * @param admin */ public static void closeAdmin(Admin admin){ if(admin!=null){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 获取Table对象,用于表的增删改查 */ public static Table getTable(String tableName){ Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { e.printStackTrace(); } return table; } /** * 关闭Table,关闭表连接 */ public static void closeTable(Table table){ try { table.close(); } catch (IOException e) { e.printStackTrace(); } } public static void printCell(Result result) throws IOException { CellScanner cellScanner = result.cellScanner(); //使用迭代器遍历元素, advance()方法:和hasnext()类似,用来判断是否有下一个元素 while(cellScanner.advance()){ //调用current方法获取当前单元格 Cell current = cellScanner.current(); //使用单元格工具类将数据克隆出来 System.out.println(new String(CellUtil.cloneRow(current)) +"\t"+new String(CellUtil.cloneFamily(current))+"\t"+new String(CellUtil.cloneValue(current))); } } }

namespace 的DDL

package com.xx.HbaseAPI; import com.xx.HbaseAPI.Utile.HbaseUtile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Map; import java.util.Set; public class NameSpaceDDL { static Admin admin; @Before public void getAdmin() { admin = HbaseUtile.getAdmin(); } @After public void closeAdmin() { HbaseUtile.closeAdmin(admin); } @Test public void CreateNamespace() throws IOException { NamespaceDescriptor ns1 = NamespaceDescriptor.create("ns1").build(); //使用admin对象使用命名空间描述器提交到hbase上 admin.createNamespace(ns1); } @Test public void listNamespace() throws IOException { NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors(); for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) { System.out.println(namespaceDescriptor.getName()); } } @Test public void listNamespaceTables() throws IOException { TableName[] ns1s = admin.listTableNamesByNamespace("ns1"); for (TableName ns1 : ns1s) { System.out.println(ns1.getNameAsString()); } } /** * 相当与shell的 describe_namespace_tables 'ns1' */ @Test public void describeNamespace() throws IOException { NamespaceDescriptor ns1 = admin.getNamespaceDescriptor("ns1"); Map<String, String> map = ns1.getConfiguration(); //System.out.println(map); Set<String> keys = map.keySet(); for (String key : keys) { System.out.println(key+"="+map.get(key)); } } /** * 相当于shell中的 alter_namespace 'ns1',{METHOND=>'set','city'=>'shenzhen','street'=>'hangcheng'} * @throws IOException */ @Test public void alterNamespaceAdd() throws IOException { NamespaceDescriptor ns1 = admin.getNamespaceDescriptor("ns1"); ns1.setConfiguration("city", "shenzhen"); ns1.setConfiguration("street", "hangcheng"); admin.modifyNamespace(ns1); } /** * 相当于shell中的 alter_namespace 'ns1',{METHOND=>'set','city'=>'datong','street'=>'xxx'} */ @Test public void alterNamespaceModify() throws IOException { NamespaceDescriptor ns1 = admin.getNamespaceDescriptor("ns1"); ns1.setConfiguration("city", "datong"); ns1.setConfiguration("street", "xxx"); admin.modifyNamespace(ns1); } /** * 相当于shell中的 alter_namespace 'ns1',{METHOND=>'unset',NAME=>'city'} */ @Test public void alterNamespaceRemove() throws IOException { NamespaceDescriptor ns1 = admin.getNamespaceDescriptor("ns1"); ns1.removeConfiguration("city"); admin.modifyNamespace(ns1); } /** * 相当于shell的 drop_namespace 'ns1' 只能删除空的命名空间 */ @Test public void dropNamespace() throws IOException { admin.deleteNamespace("ns1"); } }

table的DDL

package com.xx.HbaseAPI; import com.xx.HbaseAPI.Utile.HbaseUtile; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; public class TableDDL { Admin admin; @Before public void getAdmin(){ admin=HbaseUtile.getAdmin(); } @After public void closeAdmin(){ HbaseUtile.closeAdmin(admin); } /** * create 'ns1:xiao','f1' * @throws IOException */ @Test public void createTable() throws IOException { //admin想要创建一张表需要一个表描述器,这里new一个表描述器对象并传入表名 HTableDescriptor xiao = new HTableDescriptor(TableName.valueOf("ns1:xiao")); //创建表必须指定最少一个列族,这里获取一个列族描述对象 HColumnDescriptor f1 = new HColumnDescriptor("f1"); //设置表的属性, f1.setBloomFilterType(BloomType.ROWCOL); f1.setMaxVersions(5); f1.setTimeToLive(3600*24); //将列族描述器绑定到表描述器上 xiao.addFamily(f1); //提交到hbase中 admin.createTable(xiao); } /** * desc|describe 'ns1:xiao' */ @Test public void deacribeTable() throws IOException { //要描述一张表(desc表也就是desc表中的列族)所以需要得到表描述器对象,而列描述器对象需要从表描述器中的到 //获得表描述器 HTableDescriptor ns1 = admin.getTableDescriptor(TableName.valueOf("ns1:xiao")); //通过表描述器得到列族描述器 HColumnDescriptor[] columnFamilies = ns1.getColumnFamilies(); //遍历表描述器对象即能得到列族信息 for (HColumnDescriptor columnFamily : columnFamilies) { System.out.println(columnFamily.getMaxVersions()); System.out.println(columnFamily.getTimeToLive()); System.out.println(columnFamily.getNameAsString()); System.out.println(columnFamily.isBlockCacheEnabled());//是否支持块缓存 } } /** * alter 'ns1:xiao','f2','f3','f4' */ @Test public void alterTableAppendColumnFamily() throws IOException { //获取表描述对象用来绑定新创建的列描述器对象(添加列族,需要新创建列描述器对象) HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("ns1:xiao")); //创建类描述器对象 HColumnDescriptor f2 = new HColumnDescriptor("f2"); HColumnDescriptor f3 = new HColumnDescriptor("f3"); HColumnDescriptor f4 = new HColumnDescriptor("f4"); //可以通过列描述器对象设置列的属性,不设置即使用默认值属性 f2.setTimeToLive(3600*24*7); f2.setMaxVersions(5); f2.setBloomFilterType(BloomType.ROWCOL); //使用admin对象的addColumn()方法,需要提供一个TableName对象和列描述器对象 admin.addColumn(TableName.valueOf("ns1:xiao"),f2); admin.addColumn(TableName.valueOf("ns1:xiao"),f3); admin.addColumn(TableName.valueOf("ns1:xiao"),f4); } /** * alter 'ns1:xiao',{NAME=>'f2',VERSIONS=>3} */ @Test public void alterTableModifyColumnFamily() throws IOException { //创建一个新的和要修改的列族一样的名字的列描述器 HColumnDescriptor f2 = new HColumnDescriptor("f2"); //设置属性 f2.setMaxVersions(3); //使用admin对象来修改列族属性需要一个TableName对象和一个列描述器 //使用admin对象提交到hbase上,因为参数中有表名,所以不用和表描述器绑定 admin.modifyColumn(TableName.valueOf("ns1:xiao"),f2); } /** *alter 'ns1:xiao','delete'=>'f3' 删除列族 */ @Test public void alterTableDeleteColumnFamily() throws IOException { admin.deleteColumn(TableName.valueOf("ns1:xiao"), Bytes.toBytes("f2")); } /** * list 列出所有的表 */ @Test public void listAllTable() throws IOException { //admin对象提供了列出所有表的方法,列出所有的表,所以不用指定namespace TableName[] tableNames = admin.listTableNames(); for (TableName tableName : tableNames) { System.out.println(tableName.getNameAsString()); } } /** * 删除表前要先禁用表 * disable 'ns1:xiao' * drop 'ns1:xiao' */ @Test public void dropTable() throws IOException { TableName tableName = TableName.valueOf("ns1:xiao"); //应该先判断表是否存在,然后判断表是否禁用,最后才删除 if(admin.tableExists(tableName)){ //禁用表 admin.disableTable(tableName); //删除表 admin.deleteTable(tableName); } } }

table的CRUD

package com.xx.HbaseAPI; import com.xx.HbaseAPI.Utile.HbaseUtile; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class TableCRUD { Table table; @Before public void getTable(){ table = HbaseUtile.getTable("ns1:xiao"); } @After public void closeTable(){ HbaseUtile.closeTable(table); } /** * put 'ms1:xiao','rk0001','f1:name','zhang' */ @Test public void putData() throws IOException { //想要上传数据要获得一个put对象 Put put = new Put(Bytes.toBytes("rk000001")); //通过put对象设置对应的属性 put.addColumn(Bytes.toBytes("f1"), ("name").getBytes(), ("zhang").getBytes()); table.put(put); } /** * 批量上传数据 */ @Test public void putBatchData() throws IOException { //批量上传数据需要用到list集合 List<Put> puts=new ArrayList<>(); for (int i = 0; i < 1000; i++) { String rowkey=""; if(i<10){ rowkey="rk00000"+i; }else if(i<100){ rowkey="rk0000"+i; }else{ rowkey="rk000"+i; } Put put = new Put(rowkey.getBytes()); //需要绑定单元格 put.addColumn("f1".getBytes(), "name".getBytes(),("xin"+i).getBytes() ); put.addColumn("f1".getBytes(), "age".getBytes(), ((int)(Math.random()*100)+"").getBytes()); String sex; if(Math.random()*2==1){ sex="f"; }else sex="m"; put.addColumn("f1".getBytes(), "sex".getBytes(), sex.getBytes()); put.addColumn("f1".getBytes(), "sal".getBytes(), ((int)(Math.random()*9000+1000)+"").getBytes()); //将put对象添加到list集合中 puts.add(put); //提交 table.put(puts); } } /** * 获取单行的数据 * get 'ns1:xiao','rk000001' */ @Test public void getSingleRowData() throws IOException { //使用get对象描述一行记录 Get get = new Get(Bytes.toBytes("rk000001")); //使用table对象将get发送到hbase上,并返回查询的数据 ,返回的数据被封装成一个result对象 Result result = table.get(get); //获取结果集result的单元格扫描器,底层就是一个迭代器 CellScanner cellScanner = result.cellScanner(); //使用迭代器遍历元素, advance()方法:和hasnext()类似,用来判断是否有下一个元素 while(cellScanner.advance()){ //调用current方法获取当前单元格 Cell current = cellScanner.current(); //使用单元格工具类将数据克隆出来 System.out.println(new String(CellUtil.cloneRow(current)) +"\t"+new String(CellUtil.cloneFamily(current))+"\t"+new String(CellUtil.cloneValue(current))); } } /** * 获取多行的数据 */ @Test public void getMultipleData() throws IOException { //定义一个集合 List<Get> gets=new ArrayList<>(); //添加要查询的对象 gets.add(new Get("rk000004".getBytes())); gets.add(new Get("rk000019".getBytes())); gets.add(new Get("rk000003".getBytes())); gets.add(new Get("rk000007".getBytes())); //使用table对象将get发送到hbase上,并返回查询的数据 ,返回的数据被封装成一个result对象 Result[] results = table.get(gets); //使用自定义工具类打印信息 for (Result result : results) { HbaseUtile.printCell(result); } } /** * 使用scan对象获取多行数据 shell 语法: scan 'ns1:xiao',{LMITI=>10,STARTROW=>'rk000001'} * 如果rowkey的长度不一致,可以再要查询的endRowkey后面加一个非常小的ascii码,如:\001,\002 * 比如: * * ........ * rk 000009 * rk 000010+\001 * rk 00001011 * rk 000011 */ @Test public void scanData() throws IOException { //使用scan对象描述要查询的数据的范围 Scan scan = new Scan("rk000001".getBytes(), ("rk000010\001").getBytes()); ResultScanner scanner = table.getScanner(scan); //通过得到的扫描器得到迭代器 Iterator<Result> iterator = scanner.iterator(); while(iterator.hasNext()){ Result next = iterator.next(); //调用打印方法打印数据 HbaseUtile.printCell(next); } } /** *删除单元格: delete 'ns1:xiao','rk000001','f1:name' * */ @Test public void deleteCell() throws IOException { //获取Delete对象 Delete delete = new Delete("rk000001".getBytes()); //在指定要删除的具体单元格 delete.addColumn("f1".getBytes(), "name".getBytes()); //提交 //要删除就要传入一个Delete类型的对象 table.delete(delete); } @Test public void deleteRow() throws IOException { //获取Delete对象 Delete delete = new Delete("rk000001".getBytes()); //提交 table.delete(delete); } }

过滤器

为什么使用过滤器:没有过滤器的话,hbase只能查询整张表或者行范围的查询,不能使用其他条件进行筛选,过滤。

单列过滤器

SingleColumnValueFilter

结构过滤器

FilterList

KeyValue元数据过滤器

1、FamilyFilter

2、QualifierFilter

3、ColumnPrefixFilter

4、MultipleColumnPrefixFilter

5、ColumnRangeFilter

RowKey过滤器

RowFilter

Utility过滤器

FirstKeyOnlyFilter

分页过滤器

PageFilter

比较器

列值比较器

1、RegexStringComparator

2、SubstringComparator

3、BinaryPrefixComparator

4、BinaryComparator

过滤器加比较器的整体代码

package com.qf.hbase.api; import com.qf.hbase.util.HbaseUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Iterator; public class FilterDemo { Table table = null; @Before public void getTable(){ table = HbaseUtil.getTable("ns1:emp"); } @After public void closeTable(){ HbaseUtil.closeTable(table); } /** * select * from emp where ename = 'zhaoyun100' * 单列值过滤器 */ @Test public void testSingleColumnValueFilter() throws IOException { /** * 使用单列值过滤器指定要查询的条件 * 构造器SingleColumnValueFilter(byte[] family,byte[] key,CompareOp compareOp,byte[] value) * * CompareFilter.CompareOp.EQUAL 等于 * CompareFilter.CompareOp.GREATER 大于 * CompareFilter.CompareOp.GREATER_OR_EQUAL 大于等于 * CompareFilter.CompareOp.LESS 小于 * CompareFilter.CompareOp.LESS_OR_EQUAL 小于等于 */ SingleColumnValueFilter filter = new SingleColumnValueFilter( "f1".getBytes(),"ename".getBytes(), CompareFilter.CompareOp.EQUAL,"zhaoyun100".getBytes()); /** * 过滤器在进行过滤时,如果遍历到某一行时,如果有此列,就会比较值,当满足条件时,该行的所有单元都会返回 * 如果这一行没有此列,会被认为满足条件,并返回。 这种情况不符合业务中的需求,因此需要过滤掉 * * 使用setFilterIfMissing(boolean label)方法过滤缺失的字段 * 当label为true时,表示过滤掉缺失字段 */ filter.setFilterIfMissing(true); Scan scan = new Scan(); scan.setFilter(filter); executeScan(scan); } /** * select * from emp where ename = 'zhaoyun10' or age = 23; * * */ @Test public void testFilterList() throws IOException { //第一步:应该使用两个单列值过滤器对象分别绑定一个条件 SingleColumnValueFilter enameFilter = new SingleColumnValueFilter( "f1".getBytes(),"ename".getBytes(), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("zhaoyun10")); SingleColumnValueFilter ageFilter = new SingleColumnValueFilter( "f1".getBytes(),"age".getBytes(), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("23")); enameFilter.setFilterIfMissing(true); ageFilter.setFilterIfMissing(true); //第二步:使用结构过滤器,绑定两个条件 /** * FilterList.Operator.MUST_PASS_ONE 相当于 or ,只要满足一个条件即可 那么多条件的话,如果没有设置缺失字段过滤,只要有一个字段缺失,就认为满足条件,并返回 * FilterList.Operator.MUST_PASS_ALL 相当于 and 必须满足所有的条件才可以,那么多条件的话,如果没有设置缺失字段过滤,条件中的所有字段都缺失,才会认为满足条件,并返回 */ FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); filterList.addFilter(enameFilter); filterList.addFilter(ageFilter); //第三步:使用Scan对象进行查询 Scan scan = new Scan(); scan.setFilter(filterList); executeScan(scan); } /** * 学习一下四个常用的比较器,因为比较器可以进行设置模糊匹配 * * RegexStringComparator * SubstringComparator * BinaryPrefixComparator * BinaryComparator * */ @Test public void testComparator() throws IOException { //正则字符串比较器的应用 //RegexStringComparator comparator = new RegexStringComparator("10$"); //子串比较器 //SubstringComparator comparator = new SubstringComparator("yun10"); //二进制前缀比较器 //BinaryPrefixComparator comparator = new BinaryPrefixComparator("zhaoyun101".getBytes()); //二进制比较器 BinaryComparator comparator = new BinaryComparator("zhaoyun10".getBytes()); //使用单列值比较器,设置条件 SingleColumnValueFilter enameFilter = new SingleColumnValueFilter( "f1".getBytes(),"ename".getBytes(), CompareFilter.CompareOp.EQUAL,comparator); enameFilter.setFilterIfMissing(true); Scan scan = new Scan(); scan.setFilter(enameFilter); executeScan(scan); } /** * KeyValue过滤器之一FamilyFilter * 相当于select ColumnFamily.* from ...... */ @Test public void testFamilyFilter() throws IOException { //正则字符串比较器的应用 RegexStringComparator comparator = new RegexStringComparator("f1"); FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL,comparator); Scan scan = new Scan(); scan.setFilter(familyFilter); executeScan(scan); } /** * KeyValue过滤器之一QualifierFilter * * 相当于 select colName1,colname2,...... from ...... * * ename,age,gender,salary province city */ @Test public void testQualifierFilter() throws IOException { //使用子串比较器 SubstringComparator comparator = new SubstringComparator("e"); QualifierFilter filter = new QualifierFilter(CompareFilter.CompareOp.EQUAL,comparator); Scan scan = new Scan(); scan.setFilter(filter); executeScan(scan); } /** * KeyValue过滤器之一ColumnPrefixFilter * * 相当于 select colName1,colname2,...... from ...... */ @Test public void testColumnPrefixFilter() throws IOException { ColumnPrefixFilter filter = new ColumnPrefixFilter("p".getBytes()); Scan scan = new Scan(); scan.setFilter(filter); executeScan(scan); } /** *KeyValue过滤器之一MultipleColumnPrefixFilter * * 相当于 select colName1,colname2,...... from ...... */ @Test public void testMultipleColumnPrefixFilter() throws IOException { //创建一个二维数组 byte[][] bytes = new byte[][]{"p".getBytes(),"a".getBytes()}; MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(bytes); Scan scan = new Scan(); scan.setFilter(filter); executeScan(scan); } /** *KeyValue过滤器之一ColumnRangeFilter * * 相当于 select colName1,colname2,...... from ...... * * * age,city,ename,gender,province,salary */ @Test public void testColumnRangeFilter() throws IOException { ColumnRangeFilter rangeFilter = new ColumnRangeFilter("ename".getBytes(),true,"province".getBytes(),true); Scan scan = new Scan(); scan.setFilter(rangeFilter); executeScan(scan); } /** * 行键过滤器 * 相当于 select * from where 主键字段 liek ’‘ */ @Test public void testRowFilter() throws IOException { //使用正则比较器 RegexStringComparator comparator = new RegexStringComparator("0010"); RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,comparator); Scan scan = new Scan(); scan.setFilter(filter); executeScan(scan); } /** * Utility过滤器 * FirstkeyOnlyFilter 返回的是每一行的第一个单元格 */ @Test public void testFirstkeyOnlyFilter() throws IOException { FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter(); Scan scan = new Scan(); scan.setFilter(firstKeyOnlyFilter); executeScan(scan); } /** * 分页过滤器 * select * from limit (page-1)*pageSize,pageSize */ @Test public void testPageFilter() throws IOException { //使用分页过滤器,规定每页有多少条记录 PageFilter pageFilter = new PageFilter(20); //绑定到scan对象上 Scan scan = new Scan(); scan.setFilter(pageFilter); //不知道有多少条记录,所有使用死循环 while(true){ System.out.println("-------------------------"); ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); // 统计当前页的最大行号 byte[] maxRow =new byte[]{}; //定义一个计数器,用于统计当前页的行数 int count = 0; while(iterator.hasNext()){ Result result = iterator.next(); HbaseUtil.printCell(result); //存储这一行的行号 maxRow = result.getRow(); //计数 count++; } //如果当前页不满足20条,说明是最后一页。可以结束循环 if(count<20){ break; } //要为下一页的查询做准备 scan.setStartRow((new String(maxRow)+"\001").getBytes()); } } /** * 临时封装的执行Scan的逻辑 * @param scan * @throws IOException */ public void executeScan(Scan scan) throws IOException { ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); HbaseUtil.printCell(result); } } }

hbase的存储机制

hbase的一张表是一个region或者多个region,而一个region就是hdfs上的一个目录,hbase是面向列式存储的,所以一个region中可以有多个列族,一个列族也是hdfs上的一个目录,而列族(store)中通过memstore会产生多个文件(1小时、满128M、所有memstore占用regionserver整体内存的40%),而产生的storeFile将以HFile的格式存储再hdfs上,也就是列族对应的目录下,当storeFile的个数超过阈值时,会进行合并,再合并时会进行删除,修改,排序等操作。 在hbase中每个storeFile中的数据是有序的,但每个storeFile之间不一定是有序的,所以合并之后会再次进行排序。 store在内存中只需要维护hdfs上的原始数据的索引就可以了


region是hbase中的表或者是表的一部分,region有自己的rowkey范围。也就是基本的存储结构(存储模型,是内存中的java对象)region管理的数据有两部分, 一部分在内存中,内存中的数据是先按照rowkey字典排序,然后再key字典排序,之后再时间戳降序排序另一部分是以文件(storefile)的形式存储在hdfs上。文件中的数据从memstore中flush而来,因此也有序,不过文件之间不一定有序 region所要管理的数据可能由于单元格过多,或者是意义不同,分为列族进行管理,一个列族对应一个store(也是内存中的对象)一个store就管理着相同意义的单元格,这些单元格在对应的memstore中存储,但是由于阈值,最终会flush成storefile。storefile的个数越来越多,store(对象)管理着这些文件的索引信息。storefile的个数太多的话,也不好管理,因此会合并机制,合并成一个文件如果一个storefile过大,region和regionserver的负载不均衡,因此会有切分机制,细节上是切分文件(从某一个rowkey开始切分,文件大小尽可能均分,也可能造成其他的文件的切分),宏观上是region的切分。

hbase的寻址流程

1、client先访问zookeeper获取到meta表的存储位置 2、得到meta表的存储位置后,client访问该reginserver,并且读取meta表,查找要访问的region再哪个regionserver上 3、得到region得地址后,client访问该region进行读写操作即可

hbase的读写流程

读流程: 1、client通过寻址流程找到对应得region中的store 2、store维护了两个内存memstore用于写数据和BlockCache用于读数据 3、当读数据时,会先到BlockCache中查数据,之后到memstore中查,最后到hdfs上查,查到后会将查找出来的数据放入BlockCache中,当下次读取该数据时,速度会快很多。 写流程: 1、client通过寻址流程找到对应得region中的store 2、向对应的region中的对应store的memstore中写数据(写入memstore之前会先写到Hlog中) 3、当memstore达到条件:1小时、满128M、所有memstore占用 regionserver整体内存的40%时会flush为一个storeFile文件 4、当storeFile达到一定数量时会进行合并

布隆过滤器

布隆过滤器内部维护了一个64KB大小的字节数组,和若干格haseCode()方法,当存储元素时,会调用若干个haseCode()方法计算出int值,用来当作index,查看对应的index上的值是否为0,只要有一个是0,则说明该元素没有被存储,然后在存储后,将数组对应的index上填入1。 当查询元素时,也会调用若干个haseCode()方法计算出int值,然后查看数组中对应的index上的值,如果所有的index上的值都为1,则这个元素可能存在,如果有一个为0,则说明此元素一定不存在集合中

homework

数据:

select * from t_user where uname like ‘%l%’ and sal > 2000

使用hbase JAVA API完成以上sql的查询效果

package com.xx.HbaseAPI; //select * from t_user where uname like '%l%' and sal > 2000 //使用hbase JAVA API完成以上sql的查询效果 // //使用了两个单列值过滤器设置两个条件,然后设置一个正则比较器模糊查询,并绑定到对应的过滤器上,然后将两个 //单列值过滤器绑定到结构过滤器上 import com.xx.HbaseAPI.Utile.HbaseUtile; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Iterator; public class filterHomeWork { Table table; @Before public void getTable() { table = HbaseUtile.getTable("default:xiao"); } @After public void closeTable() { HbaseUtile.closeTable(table); } @Test public void homeWork() throws IOException { //定义一个单列值过滤器 SingleColumnValueFilter filter1 = new SingleColumnValueFilter("f1".getBytes(), "sal".getBytes(), CompareFilter.CompareOp.GREATER, "2000".getBytes()); //定义一个正则比较器,用于模糊查询 RegexStringComparator regex = new RegexStringComparator("l"); //定义一个单列值过滤器,绑定比较器 SingleColumnValueFilter filter2 = new SingleColumnValueFilter("f1".getBytes(), "uname".getBytes(), CompareFilter.CompareOp.EQUAL, regex); //定义一个结构过滤器,绑定两个单列值过滤器 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); //设置空值过滤 filter1.setFilterIfMissing(true); filter2.setFilterIfMissing(true); //绑定 filterList.addFilter(filter1); filterList.addFilter(filter2); //使用scan对象进行查询 Scan scan = new Scan(); scan.setFilter(filterList); //提交到hbase上 ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()) { Result next = iterator.next(); CellScanner cellScanner = next.cellScanner(); //使用迭代器遍历元素, advance()方法:和hasnext()类似,用来判断是否有下一个元素 while (cellScanner.advance()) { //调用current方法获取当前单元格 Cell current = cellScanner.current(); //使用单元格工具类将数据克隆出来 System.out.println(new String(CellUtil.cloneRow(current)) + "\t" + new String(CellUtil.cloneFamily(current)) + "\t" + new String(CellUtil.cloneValue(current))); } } } }

hbase和hive的整合

整合的原因

1. HBase的最主要的目的是做数据存储 2. Hive的最主要作用是做数据分析 3. 整合的目的是为了方便的在项目中存储+分析 4. hbase中的表数据在hive中能看到,hive中的表数据在hbase中也能看到

Hive-To-Hbase

1) 首先在hive中创建hbase能看到的表

create table if not exists hive2hbase ( uid int, uname string, age int, sex string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties( "hbase.columns.mapping"=":key,base_info:name,base_info:age,base_info:sex" ) tblproperties( "hbase.table.name"="hive2hbase1" );

2) 如果hive是低版本的,如1.2.1,那么应该报以下错误

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 原因是:依赖于hive-hbase-handler-1.2.1.jar,而找到的hbase中的版本匹配不一致造成的。 hive-2.1.1不会出现以上错误

3) 解决办法

- 可以使用maven 下载此包 <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-hbase-handler</artifactId> <version>1.2.1</version> </dependency> - 去本地仓库中找到源码包,解压 - 使用IDE,随便创建一个空项目 - 将源码包copy到src下,因为依赖hive和hbase的jar,因此有红叉 - 将hive和hbase的lib下的所有jar 放入此项目下的lib中 - 将源码重写打包生成hive-hbase-handler-1.2.1.jar,覆盖掉hive的lib下原有的jar包 - 然后再重写建表,就好了

4) 进行测试

1. 在hive中插入一条记录 insert into hive2hbase values(1, 'michael', 32, '男'); 2. 在hbase中插入一条记录 put 'hive2hbase1', '2', 'base_info:age','33' put 'hive2hbase1', '2', 'base_info:name','rock' put 'hive2hbase1', '2', 'base_info:sex','女' 3. 分别查看查询结果 4. 导入数据 借助中间表向hive2hbase表中导入数据。 create table if not exists temp_hivehbase ( uid int, uname string, age int, sex string ) row format delimited fields terminated by ' ';

Hbase-To-Hive

1) 确定hbase中的某一张表的表结构和表数据

hbase> create 'ns1:t1','f1' put 'ns1:t1','rk00001','f1:name','zhaoyun' put 'ns1:t1','rk00001','f1:age',23 put 'ns1:t1','rk00001','f1:gender','m' put 'ns1:t1','rk00002','f1:name','zhenji' put 'ns1:t1','rk00002','f1:age',24 put 'ns1:t1','rk00002','f1:gender','f' put 'ns1:t1','rk00003','f1:name','貂蝉' put 'ns1:t1','rk00003','f1:age',24 put 'ns1:t1','rk00003','f1:gender','f'

2) 根据hbase的表结构建立hive表

drop table hbase2hive; create external table if not exists hbase2hive( uid string, age int, name string, sex string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties( "hbase.columns.mapping"=":key,f1:age,f1:name,f1:gender" ) tblproperties( "hbase.table.name"="ns1:t1" );

3) 在hive中查看结果

4) 注意事项

映射hbase的列时,要么就写:key,要么不写,默认使用:keyhbase中表存在的时候,在hive中创建表时应该使用external关键字。如果删除了hbase中对应的表数据,那么hive中就不能查询出来数据。hbase中的列和hive中的列个数和数据类型应该尽量相同,hive表和hbase表的字段不是按照名字匹配,而是按照顺序来匹配的。hive、hbase和mysql等可以使用第三方工具来进行整合。
最新回复(0)