SpringBoot使用Hbase
文章目录
- SpringBoot使用Hbase
- 一,引入依赖
- 二,配置文件添加自己的属性
- 三,配置类注入HBASE配置
- 四,配置Hbase连接池
- 五,配置操作服务类
一,引入依赖
org.apache.hbase hbase-client 2.3.2 org.slf4j slf4j-log4j12 二,配置文件添加自己的属性
hbase: zookeeper: quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155 property: clientPort: 2181 master: port: 9001
三,配置类注入HBASE配置
package com.hbase.config; import org.apache.hadoop.hbase.HBaseConfiguration; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName: HBaseConfig * @author: Leemon * @Description: TODO * @date: 2023/4/12 18:06 * @version: 1.0 */ @Configuration @RefreshScope public class HBaseConfig { @Value("${hbase.zookeeper.quorum}") private String zookeeperQuorum; @Value("${hbase.zookeeper.property.clientPort}") private String clientPort; @Value("${hbase.master.port}") private String masterPort; @Bean public org.apache.hadoop.conf.Configuration hbaseConfiguration() { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", zookeeperQuorum); conf.set("hbase.zookeeper.property.clientPort", clientPort); // 如果hbase是集群,这个必须加上 // 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的 conf.set("hbase.master", zookeeperQuorum + ":" + masterPort); conf.set("hbase.client.keyvalue.maxsize", "20971520"); conf = HBaseConfiguration.create(conf); return conf; } }
四,配置Hbase连接池
这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长
package com.hbase.config; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.io.IOException; import java.util.Enumeration; import java.util.Vector; /** * @ClassName: HbaseConnectionPool * @author: Leemon * @Description: TODO * @date: 2023/4/13 9:45 * @version: 1.0 */ @Component @Slf4j public class HbaseConnectionPool { /** * 连接池最大的大小 */ private int nMaxConnections = 20; /** * 连接池自动增加的大小 */ private int nIncrConnectionAmount = 3; /** * 连接池的初始大小 */ private int nInitConnectionAmount = 3; /** * 存放连接池中数据库连接的向量,初始时为null */ private Vector vcConnections = null; @Resource private Configuration hbaseConfiguration; @PostConstruct public void init() { try { vcConnections = new Vector(); createConnections(nInitConnectionAmount); } catch (Exception e) { e.printStackTrace(); } } public synchronized Connection getConnection() { // 确保连接池己被创建 if (vcConnections == null) { // 连接池还没创建,则返回null return null; } // 获得一个可用的数据库连接 Connection conn = getFreeConnection(); // 如果目前没有可以使用的连接,即所有的连接都在使用中 while (conn == null) { // 等一会再试 try { wait(250); } catch (InterruptedException e) { e.printStackTrace(); } // 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接 conn = getFreeConnection(); } // 返回获得的可用的连接 return conn; } /** * 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果 * 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置 * 的值创建几个数据库连接,并放入连接池中。 * 如果创建后,所有的连接仍都在使用中,则返回 null * @return * 返回一个可用的数据库连接 */ private Connection getFreeConnection() { // 从连接池中获得一个可用的数据库连接 Connection conn = findFreeConnection(); if (conn == null) { // 如果目前连接池中没有可用的连接 // 创建一些连接 try { createConnections(nIncrConnectionAmount); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); log.error("create new connection fail.", e); } // 重新从池中查找是否有可用连接 conn = findFreeConnection(); if (conn == null) { // 如果创建连接后仍获得不到可用的连接,则返回 null return null; } } return conn; } /** * 创建由 numConnections 指定数目的数据库连接 , 并把这些连接 * 放入 connections 向量中 * @param _nNumConnections 要创建的数据库连接的数目 * @throws Exception */ private void createConnections(int _nNumConnections) throws Exception { // 循环创建指定数目的数据库连接 for (int x = 0; x < _nNumConnections; x++) { // 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections // 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。 // 如果连接数己经达到最大,即退出。 if (this.nMaxConnections > 0 && this.vcConnections.size() >= this.nMaxConnections) { log.warn("已达到最大连接数,不能再增加连接"); throw new Exception("已达到最大连接数"+ nMaxConnections+",不能再增加连接"); } // 增加一个连接到连接池中(向量 connections 中) vcConnections.addElement(new ConnectionWrapper(newConnection())); log.info("HBase数据库连接己创建 ...... " + x); } } /** * 查找池中所有的連接,查找一个可用的數據庫連接, * 如果没有可用的連結,返回null * @return * 返回一個可用的數據庫連接 */ private Connection findFreeConnection() { Connection conn = null; ConnectionWrapper connWrapper = null; //獲得連接池向量中所有的對象 Enumeration enumerate = vcConnections.elements(); //遍歷所有的对象,看是否有可用的連接 while (enumerate.hasMoreElements()) { connWrapper = (ConnectionWrapper) enumerate.nextElement(); if (!connWrapper.isBusy()) { //如果此對象不忙,則獲得它的數據庫連接并把它設為忙 conn = connWrapper.getConnection(); connWrapper.setBusy(true); // 己经找到一个可用的連接,退出 break; } } // 返回找到的可用連接 return conn; } /** *创建一个新的数据库连接并返回它 * @return * 返回一个新创建的数据库连接 */ private Connection newConnection() { /** hbase 连接 */ Connection conn = null; // 创建一个数据库连接 try { conn = ConnectionFactory.createConnection(hbaseConfiguration); } catch (IOException e) { log.error("创建HBase数据库连接失败!"); e.printStackTrace(); } // 返回创建的新的数据库连接 return conn; } public synchronized void releaseConnection(Connection conn) { if (this.vcConnections == null) { log.info("连接池不存在,无法返回此连接到连接池中!!"); } else { ConnectionWrapper connWrapper = null; Enumeration enumerate = this.vcConnections.elements(); while(enumerate.hasMoreElements()) { connWrapper = (ConnectionWrapper) enumerate.nextElement(); if (conn == connWrapper.getConnection()) { connWrapper.setBusy(false); break; } } } } class ConnectionWrapper { /** * 数据库连接 */ private Connection connection = null; /** * 此连接是否正在使用的标志,默认没有正在使用 */ private boolean busy = false; /** * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象 */ public ConnectionWrapper(Connection connection) { this.connection = connection; } /** * 返回此对象中的连接 */ public Connection getConnection() { return connection; } /** * 设置此对象的连接 */ public void setConnection(Connection connection) { this.connection = connection; } /** * 获得对象连接是否忙 */ public boolean isBusy() { return busy; } /** * 设置对象的连接正在忙 */ public void setBusy(boolean busy) { this.busy = busy; } } }
init()方法实现在初始化连接池的时候创建默认数值的连接。
五,配置操作服务类
操作类接口 HbaseService.java
package com.hbase.service; import org.apache.hadoop.hbase.client.Scan; import java.util.Map; /** * @InterfaceName: HbaseService * @author: Leemon * @Description: TODO * @date: 2023/4/12 18:11 * @version: 1.0 */ public interface HbaseService { Map
> getResultScanner(String tableName, String startRowKey, String stopRowKey); Map getRowData(String tableName, String rowKey); Map getFamilyValue(String tableName, String rowKey, String familyName); String getColumnValue(String tableName, String rowKey, String familyName, String columnName); Map > queryData(String tableName, Scan scan); } 接口实现类 HbaseServiceImpl.java
package com.hbase.service.impl; import com.hbase.config.HbaseConnectionPool; import com.hbase.service.HbaseService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; import java.text.MessageFormat; import java.util.*; /** * @ClassName: HbaseServiceImpl * @author: Leemon * @Description: TODO * @date: 2023/4/12 18:13 * @version: 1.0 */ @Slf4j @Service public class HbaseServiceImpl implements HbaseService { @Resource private HbaseConnectionPool pool; @Override public Map
> getResultScanner(String tableName, String startRowKey, String stopRowKey){ Scan scan = new Scan(); if(StringUtils.isNotBlank(startRowKey) && StringUtils.isNotBlank(stopRowKey)){ scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(stopRowKey)); } return this.queryData(tableName,scan); } public Map > getResultScannerPrefixFilter(String tableName, String prefix){ Scan scan = new Scan(); if(StringUtils.isNotBlank(prefix)){ Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); scan.setFilter(filter); } return this.queryData(tableName,scan); } @Override public Map > queryData(String tableName, Scan scan){ Map > result = new HashMap<>(); ResultScanner rs = null; // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); rs = table.getScanner(scan); for (Result r : rs) { //每一行数据 Map columnMap = new HashMap<>(); String rowKey = null; for (Cell cell : r.listCells()) { if(rowKey == null){ rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()); } columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } if(rowKey != null){ result.put(rowKey,columnMap); } } }catch (IOException e) { log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}" ,tableName),e); }finally { close(null, rs, table, connection); } return result; } @Override public Map getRowData(String tableName, String rowKey){ //返回的键值对 Map result = new HashMap<>(); Get get = new Get(Bytes.toBytes(rowKey)); // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); Result hTableResult = table.get(get); if (hTableResult != null && !hTableResult.isEmpty()) { for (Cell cell : hTableResult.listCells()) { result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } // 某些应用场景需要插入到数据库的时间 if (hTableResult.listCells().size() > 0) { result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp() + ""); } } }catch (IOException e) { log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}" ,tableName,rowKey),e); }finally { close(null,null, table, connection); } return result; } @Override public Map getFamilyValue(String tableName, String rowKey, String familyName){ //返回的键值对 Map result = new HashMap<>(2); Get get = new Get(Bytes.toBytes(rowKey)); get.addFamily(Bytes.toBytes(familyName)); // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); Result getResult = table.get(get); if (getResult != null && !getResult.isEmpty()) { for (Cell cell : getResult.listCells()) { result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } } catch (IOException e) { log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}" , tableName, rowKey, familyName), e); }finally { close(null,null, table, connection); } return result; } @Override public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){ String str = null; Get get = new Get(Bytes.toBytes(rowKey)); // 获取表 Table table= null; Connection connection = null; try { connection = pool.getConnection(); table = getTable(connection, tableName); Result result = table.get(get); if (result != null && !result.isEmpty()) { Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); if(cell != null){ str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); } } } catch (IOException e) { log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}" ,tableName,rowKey,familyName,columnName),e); }finally { close(null,null, table, connection); } return str; } private Table getTable(Connection connection, String tableName) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); return table; } private void close(Admin admin, ResultScanner rs, Table table, Connection connection){ if(admin != null){ try { admin.close(); } catch (IOException e) { log.error("关闭Admin失败",e); } } if(rs != null){ rs.close(); } if(table != null){ try { table.close(); } catch (IOException e) { log.error("关闭Table失败",e); } } // 释放连接 if (Objects.nonNull(connection)) { pool.releaseConnection(connection); } } } ok,现在就可以操作使用了。
以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)
这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章