SpringBoot使用Hbase
文章目录
- SpringBoot使用Hbase
- 一,引入依赖
- 二,配置文件添加自己的属性
- 三,配置类注入HBASE配置
- 四,配置Hbase连接池
- 五,配置操作服务类
一,引入依赖
org.apache.hbase hbase-client 2.3.2 org.slf4j slf4j-log4j12 二,配置文件添加自己的属性
hbase: zookeeper: quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155 property: clientPort: 2181 master: port: 9001
三,配置类注入HBASE配置
package com.hbase.config; import org.apache.hadoop.hbase.HBaseConfiguration; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName: HBaseConfig * @author: Leemon * @Description: TODO * @date: 2023/4/12 18:06 * @version: 1.0 */ @Configuration @RefreshScope public class HBaseConfig { @Value("${hbase.zookeeper.quorum}") private String zookeeperQuorum; @Value("${hbase.zookeeper.property.clientPort}") private String clientPort; @Value("${hbase.master.port}") private String masterPort; @Bean public org.apache.hadoop.conf.Configuration hbaseConfiguration() { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", zookeeperQuorum); conf.set("hbase.zookeeper.property.clientPort", clientPort); // 如果hbase是集群,这个必须加上 // 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的 conf.set("hbase.master", zookeeperQuorum + ":" + masterPort); conf.set("hbase.client.keyvalue.maxsize", "20971520"); conf = HBaseConfiguration.create(conf); return conf; } }
四,配置Hbase连接池
这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长
package com.hbase.config; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.io.IOException; import java.util.Enumeration; import java.util.Vector; /** * @ClassName: HbaseConnectionPool * @author: Leemon * @Description: TODO * @date: 2023/4/13 9:45 * @version: 1.0 */ @Component @Slf4j public class HbaseConnectionPool { /** * 连接池最大的大小 */ private int nMaxConnections = 20; /** * 连接池自动增加的大小 */ private int nIncrConnectionAmount = 3; /** * 连接池的初始大小 */ private int nInitConnectionAmount = 3; /** * 存放连接池中数据库连接的向量,初始时为null */ private Vector vcConnections = null; @Resource private Configuration hbaseConfiguration; @PostConstruct public void init() { try { vcConnections = new Vector(); createConnections(nInitConnectionAmount); } catch (Exception e) { e.printStackTrace(); } } public synchronized Connection getConnection() { // 确保连接池己被创建 if (vcConnections == null) { // 连接池还没创建,则返回null return null; } // 获得一个可用的数据库连接 Connection conn = getFreeConnection(); // 如果目前没有可以使用的连接,即所有的连接都在使用中 while (conn == null) { // 等一会再试 try { wait(250); } catch (InterruptedException e) { e.printStackTrace(); } // 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接 conn = getFreeConnection(); } // 返回获得的可用的连接 return conn; } /** * 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果 * 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置 * 的值创建几个数据库连接,并放入连接池中。 * 如果创建后,所有的连接仍都在使用中,则返回 null * @return * 返回一个可用的数据库连接 */ private Connection getFreeConnection() { // 从连接池中获得一个可用的数据库连接 Connection conn = findFreeConnection(); if (conn == null) { // 如果目前连接池中没有可用的连接 // 创建一些连接 try { createConnections(nIncrConnectionAmount); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); log.error("create new connection fail.", e); } // 重新从池中查找是否有可用连接 conn = findFreeConnection(); if (conn == null) { // 如果创建连接后仍获得不到可用的连接,则返回 null return null; } } return conn; } /** * 创建由 numConnections 指定数目的数据库连接 , 并把这些连接 * 放入 connections 向量中 * @param _nNumConnections 要创建的数据库连接的数目 * @throws Exception */ private void createConnections(int _nNumConnections) throws Exception { // 循环创建指定数目的数据库连接 for (int x = 0; x < _nNumConnections; x++) { // 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections // 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。 // 如果连接数己经达到最大,即退出。 if (this.nMaxConnections > 0 && this.vcConnections.size() >= this.nMaxConnections) { log.warn("已达到最大连接数,不能再增加连接"); throw new Exception("已达到最大连接数"+ nMaxConnections+",不能再增加连接"); } // 增加一个连接到连接池中(向量 connections 中) vcConnections.addElement(new ConnectionWrapper(newConnection())); log.info("HBase数据库连接己创建 ...... " + x); } } /** * 查找池中所有的連接,查找一个可用的數據庫連接, * 如果没有可用的連結,返回null * @return * 返回一個可用的數據庫連接 */ private Connection findFreeConnection() { Connection conn = null; ConnectionWrapper connWrapper = null; //獲得連接池向量中所有的對象 Enumeration enumerate = vcConnections.elements(); //遍歷所有的对象,看是否有可用的連接 while (enumerate.hasMoreElements()) { connWrapper = (ConnectionWrapper) enumerate.nextElement(); if (!connWrapper.isBusy()) { //如果此對象不忙,則獲得它的數據庫連接并把它設為忙 conn = connWrapper.getConnection(); connWrapper.setBusy(true); // 己经找到一个可用的連接,退出 break; } } // 返回找到的可用連接 return conn; } /** *创建一个新的数据库连接并返回它 * @return * 返回一个新创建的数据库连接 */ private Connection newConnection() { /** hbase 连接 */ Connection conn = null; // 创建一个数据库连接 try { conn = ConnectionFactory.createConnection(hbaseConfiguration); } catch (IOException e) { log.error("创建HBase数据库连接失败!"); e.printStackTrace(); } // 返回创建的新的数据库连接 return conn; } public synchronized void releaseConnection(Connection conn) { if (this.vcConnections == null) { log.info("连接池不存在,无法返回此连接到连接池中!!"); } else { ConnectionWrapper connWrapper = null; Enumeration enumerate = this.vcConnections.elements(); while(enumerate.hasMoreElements()) { connWrapper = (ConnectionWrapper) enumerate.nextElement(); if (conn == connWrapper.getConnection()) { connWrapper.setBusy(false); break; } } } } class ConnectionWrapper { /** * 数据库连接 */ private Connection connection = null; /** * 此连接是否正在使用的标志,默认没有正在使用 */ private boolean busy = false; /** * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象 */ public ConnectionWrapper(Connection connection) { this.connection = connection; } /** * 返回此对象中的连接 */ public Connection getConnection() { return connection; } /** * 设置此对象的连接 */ public void setConnection(Connection connection) { this.connection = connection; } /** * 获得对象连接是否忙 */ public boolean isBusy() { return busy; } /** * 设置对象的连接正在忙 */ public void setBusy(boolean busy) { this.busy = busy; } } }
init()方法实现在初始化连接池的时候创建默认数值的连接。
五,配置操作服务类
操作类接口 HbaseService.java
package com.hbase.service; import org.apache.hadoop.hbase.client.Scan; import java.util.Map; /** * @InterfaceName: HbaseService * @author: Leemon * @Description: TODO * @date: 2023/4/12 18:11 * @version: 1.0 */ public interface HbaseService { Map
> getResultScanner(String tableName, String startRowKey, String stopRowKey); Map getRowData(String tableName, String rowKey); Map getFamilyValue(String tableName, String rowKey, String familyName); String getColumnValue(String tableName, String rowKey, String familyName, String columnName); Map > queryData(String tableName, Scan scan); } 接口实现类 HbaseServiceImpl.java
package com.hbase.service.impl; import com.hbase.config.HbaseConnectionPool; import com.hbase.service.HbaseService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; import java.text.MessageFormat; import java.util.*; /** * @ClassName: HbaseServiceImpl * @author: Leemon * @Description: TODO * @date: 2023/4/12 18:13 * @version: 1.0 */ @Slf4j @Service public class HbaseServiceImpl implements HbaseService { @Resource private HbaseConnectionPool pool; @Override public Map
> getResultScanner(String tableName, String startRowKey, String stopRowKey){ Scan scan = new Scan(); if(StringUtils.isNotBlank(startRowKey) && StringUtils.isNotBlank(stopRowKey)){ scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(stopRowKey)); } return this.queryData(tableName,scan); } public Map > getResultScannerPrefixFilter(String tableName, String prefix){ Scan scan = new Scan(); if(StringUtils.isNotBlank(prefix)){ Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); scan.setFilter(filter); } return this.queryData(tableName,scan); } @Override public Map > queryData(String tableName, Scan scan){ Map > result = new HashMap<>(); ResultScanner rs = null; // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); rs = table.getScanner(scan); for (Result r : rs) { //每一行数据 Map columnMap = new HashMap<>(); String rowKey = null; for (Cell cell : r.listCells()) { if(rowKey == null){ rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()); } columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } if(rowKey != null){ result.put(rowKey,columnMap); } } }catch (IOException e) { log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}" ,tableName),e); }finally { close(null, rs, table, connection); } return result; } @Override public Map getRowData(String tableName, String rowKey){ //返回的键值对 Map result = new HashMap<>(); Get get = new Get(Bytes.toBytes(rowKey)); // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); Result hTableResult = table.get(get); if (hTableResult != null && !hTableResult.isEmpty()) { for (Cell cell : hTableResult.listCells()) { result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } // 某些应用场景需要插入到数据库的时间 if (hTableResult.listCells().size() > 0) { result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp() + ""); } } }catch (IOException e) { log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}" ,tableName,rowKey),e); }finally { close(null,null, table, connection); } return result; } @Override public Map getFamilyValue(String tableName, String rowKey, String familyName){ //返回的键值对 Map result = new HashMap<>(2); Get get = new Get(Bytes.toBytes(rowKey)); get.addFamily(Bytes.toBytes(familyName)); // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); Result getResult = table.get(get); if (getResult != null && !getResult.isEmpty()) { for (Cell cell : getResult.listCells()) { result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } } catch (IOException e) { log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}" , tableName, rowKey, familyName), e); }finally { close(null,null, table, connection); } return result; } @Override public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){ String str = null; Get get = new Get(Bytes.toBytes(rowKey)); // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); Result result = table.get(get); if (result != null && !result.isEmpty()) { Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); if(cell != null){ str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); } } } catch (IOException e) { log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}" ,tableName,rowKey,familyName,columnName),e); }finally { close(null,null, table, connection); } return str; } private Table getTable(Connection connection, String tableName) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); return table; } private void close(Admin admin, ResultScanner rs, Table table, Connection connection){ if(admin != null){ try { admin.close(); } catch (IOException e) { log.error("关闭Admin失败",e); } } if(rs != null){ rs.close(); } if(table != null){ try { table.close(); } catch (IOException e) { log.error("关闭Table失败",e); } } // 释放连接 if (Objects.nonNull(connection)) { pool.releaseConnection(connection); } } } ok,现在就可以操作使用了。
以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)
这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。
猜你喜欢
- 17天前(上海文旅产业发展高峰论坛)《上海打造文旅元宇宙新赛道行动方案》发布
- 17天前(哈弗h9优惠9万是真的吗)热浪来袭,哈弗H9超值补贴火热加码
- 17天前(重庆恐龙化石遗址)重庆黔江恐龙化石抢救性发掘新闻发布会举行
- 17天前(曼谷丽思卡尔顿公寓价格)曼谷丽思卡尔顿酒店盛大启幕,开创泰国奢华雅致新纪元
- 17天前(澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布)澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布
- 17天前(美诺酒店集团旗下臻选品牌m collection)美诺酒店集团启动盛橡品牌战略焕新 开启全球扩张新篇章
- 17天前(世茂海峡大厦多高)巴西地产高管齐聚厦门世茂海峡大厦 共探超高层建筑锻造经验
- 17天前(锦州新增两家国家aaa级旅游景区有哪些)锦州新增两家国家AAA级旅游景区
- 17天前(泛舟诗海觅春迹什么意思)泛舟觅桃源,又一头部机构下场文旅赛道
- 17天前(阿斯塔纳航空属于哪个联盟)阿斯塔纳航空荣获Skytrax世界航空公司大奖,将继续助力中哈交流往来
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章