上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

SpringBoot使用Hbase

guduadmin15小时前

SpringBoot使用Hbase

文章目录

  • SpringBoot使用Hbase
  • 一,引入依赖
  • 二,配置文件添加自己的属性
  • 三,配置类注入HBASE配置
  • 四,配置Hbase连接池
  • 五,配置操作服务类

    一,引入依赖

    		
                org.apache.hbase
                hbase-client
                2.3.2
                
                    
                        org.slf4j
                        slf4j-log4j12
                    
                
            
    

    二,配置文件添加自己的属性

    hbase:
      zookeeper:
        quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155
        property:
          clientPort: 2181
      master:
        port: 9001
    

    三,配置类注入HBASE配置

    package com.hbase.config;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /**
     * @ClassName: HBaseConfig
     * @author: Leemon
     * @Description: TODO
     * @date: 2023/4/12 18:06
     * @version: 1.0
     */
    @Configuration
    @RefreshScope
    public class HBaseConfig {
        @Value("${hbase.zookeeper.quorum}")
        private String zookeeperQuorum;
        @Value("${hbase.zookeeper.property.clientPort}")
        private String clientPort;
        @Value("${hbase.master.port}")
        private String masterPort;
        @Bean
        public org.apache.hadoop.conf.Configuration hbaseConfiguration() {
            org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
            conf.set("hbase.zookeeper.property.clientPort", clientPort);
            // 如果hbase是集群,这个必须加上
            // 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的
            conf.set("hbase.master", zookeeperQuorum + ":" + masterPort);
            conf.set("hbase.client.keyvalue.maxsize", "20971520");
            conf = HBaseConfiguration.create(conf);
            return conf;
        }
    }
    

    四,配置Hbase连接池

    这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长

    package com.hbase.config;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.io.IOException;
    import java.util.Enumeration;
    import java.util.Vector;
    /**
     * @ClassName: HbaseConnectionPool
     * @author: Leemon
     * @Description: TODO
     * @date: 2023/4/13 9:45
     * @version: 1.0
     */
    @Component
    @Slf4j
    public class HbaseConnectionPool {
        /**
         * 连接池最大的大小
         */
        private int nMaxConnections = 20;
        /**
         * 连接池自动增加的大小
         */
        private int nIncrConnectionAmount = 3;
        /**
         * 连接池的初始大小
         */
        private int nInitConnectionAmount = 3;
        /**
         * 存放连接池中数据库连接的向量,初始时为null
         */
        private Vector vcConnections = null;
        @Resource
        private Configuration hbaseConfiguration;
        @PostConstruct
        public void init() {
            try {
                vcConnections = new Vector();
                createConnections(nInitConnectionAmount);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public synchronized Connection getConnection() {
            // 确保连接池己被创建
            if (vcConnections == null) {
                // 连接池还没创建,则返回null
                return null;
            }
            // 获得一个可用的数据库连接
            Connection conn = getFreeConnection();
            // 如果目前没有可以使用的连接,即所有的连接都在使用中
            while (conn == null) {
                // 等一会再试
                try {
                    wait(250);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接
                conn = getFreeConnection();
            }
            // 返回获得的可用的连接
            return conn;
        }
        /**
         * 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果
         * 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置
         * 的值创建几个数据库连接,并放入连接池中。
         * 如果创建后,所有的连接仍都在使用中,则返回 null
         * @return
         * 		返回一个可用的数据库连接
         */
        private Connection getFreeConnection() {
            // 从连接池中获得一个可用的数据库连接
            Connection conn = findFreeConnection();
            if (conn == null) {
                // 如果目前连接池中没有可用的连接
                // 创建一些连接
                try {
                    createConnections(nIncrConnectionAmount);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    log.error("create new connection fail.", e);
                }
                // 重新从池中查找是否有可用连接
                conn = findFreeConnection();
                if (conn == null) {
                    // 如果创建连接后仍获得不到可用的连接,则返回 null
                    return null;
                }
            }
            return conn;
        }
        /**
         * 创建由 numConnections 指定数目的数据库连接 , 并把这些连接
         * 放入 connections 向量中
         * @param _nNumConnections 要创建的数据库连接的数目
         * @throws Exception
         */
        private void createConnections(int _nNumConnections) throws Exception {
            // 循环创建指定数目的数据库连接
            for (int x = 0; x < _nNumConnections; x++) {
                // 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections
                // 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。
                // 如果连接数己经达到最大,即退出。
                if (this.nMaxConnections > 0  && this.vcConnections.size() >= this.nMaxConnections) {
                    log.warn("已达到最大连接数,不能再增加连接");
                    throw new Exception("已达到最大连接数"+ nMaxConnections+",不能再增加连接");
                }
                // 增加一个连接到连接池中(向量 connections 中)
                vcConnections.addElement(new ConnectionWrapper(newConnection()));
                log.info("HBase数据库连接己创建 ...... " + x);
            }
        }
        /**
         * 查找池中所有的連接,查找一个可用的數據庫連接,
         * 如果没有可用的連結,返回null
         * @return
         * 		返回一個可用的數據庫連接
         */
        private Connection findFreeConnection() {
            Connection conn = null;
            ConnectionWrapper connWrapper = null;
            //獲得連接池向量中所有的對象
            Enumeration enumerate = vcConnections.elements();
            //遍歷所有的对象,看是否有可用的連接
            while (enumerate.hasMoreElements()) {
                connWrapper = (ConnectionWrapper) enumerate.nextElement();
                if (!connWrapper.isBusy()) {
                    //如果此對象不忙,則獲得它的數據庫連接并把它設為忙
                    conn = connWrapper.getConnection();
                    connWrapper.setBusy(true);
                    // 己经找到一个可用的連接,退出
                    break;
                }
            }
            // 返回找到的可用連接
            return conn;
        }
        /**
         *创建一个新的数据库连接并返回它
         * @return
         * 		返回一个新创建的数据库连接
         */
        private Connection newConnection() {
            /** hbase 连接 */
            Connection conn = null;
            // 创建一个数据库连接
            try {
                conn = ConnectionFactory.createConnection(hbaseConfiguration);
            } catch (IOException e) {
                log.error("创建HBase数据库连接失败!");
                e.printStackTrace();
            }
            // 返回创建的新的数据库连接
            return conn;
        }
        public synchronized void releaseConnection(Connection conn) {
            if (this.vcConnections == null) {
                log.info("连接池不存在,无法返回此连接到连接池中!!");
            } else {
                ConnectionWrapper connWrapper = null;
                Enumeration enumerate = this.vcConnections.elements();
                while(enumerate.hasMoreElements()) {
                    connWrapper = (ConnectionWrapper) enumerate.nextElement();
                    if (conn == connWrapper.getConnection()) {
                        connWrapper.setBusy(false);
                        break;
                    }
                }
            }
        }
        class ConnectionWrapper {
            
            /**
             * 数据库连接
             */
            private Connection connection = null;
            
            /**
             * 此连接是否正在使用的标志,默认没有正在使用
             */
            private boolean busy = false;
            /**
             * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
             */
            public ConnectionWrapper(Connection connection) {
                this.connection = connection;
            }
            /**
             * 返回此对象中的连接
             */
            public Connection getConnection() {
                return connection;
            }
            /**
             * 设置此对象的连接
             */
            public void setConnection(Connection connection) {
                this.connection = connection;
            }
            /**
             * 获得对象连接是否忙
             */
            public boolean isBusy() {
                return busy;
            }
            /**
             * 设置对象的连接正在忙
             */
            public void setBusy(boolean busy) {
                this.busy = busy;
            }
        }
    }
    

    init()方法实现在初始化连接池的时候创建默认数值的连接。

    五,配置操作服务类

    操作类接口 HbaseService.java

    package com.hbase.service;
    import org.apache.hadoop.hbase.client.Scan;
    import java.util.Map;
    /**
     * @InterfaceName: HbaseService
     * @author: Leemon
     * @Description: TODO
     * @date: 2023/4/12 18:11
     * @version: 1.0
     */
    public interface HbaseService {
        Map> getResultScanner(String tableName, String startRowKey, String stopRowKey);
        Map getRowData(String tableName, String rowKey);
        Map getFamilyValue(String tableName, String rowKey, String familyName);
        String getColumnValue(String tableName, String rowKey, String familyName, String columnName);
        Map> queryData(String tableName, Scan scan);
    }
    

    接口实现类 HbaseServiceImpl.java

    package com.hbase.service.impl;
    import com.hbase.config.HbaseConnectionPool;
    import com.hbase.service.HbaseService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.PrefixFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    import java.io.IOException;
    import java.text.MessageFormat;
    import java.util.*;
    /**
     * @ClassName: HbaseServiceImpl
     * @author: Leemon
     * @Description: TODO
     * @date: 2023/4/12 18:13
     * @version: 1.0
     */
    @Slf4j
    @Service
    public class HbaseServiceImpl implements HbaseService {
        @Resource
        private HbaseConnectionPool pool;
        @Override
        public Map> getResultScanner(String tableName, String startRowKey, String stopRowKey){
            Scan scan = new Scan();
            if(StringUtils.isNotBlank(startRowKey) && StringUtils.isNotBlank(stopRowKey)){
                scan.withStartRow(Bytes.toBytes(startRowKey));
                scan.withStopRow(Bytes.toBytes(stopRowKey));
            }
            return this.queryData(tableName,scan);
        }
        public Map> getResultScannerPrefixFilter(String tableName, String prefix){
            Scan scan = new Scan();
            if(StringUtils.isNotBlank(prefix)){
                Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
                scan.setFilter(filter);
            }
            return this.queryData(tableName,scan);
        }
        @Override
        public Map> queryData(String tableName, Scan scan){
            
            Map> result = new HashMap<>();
            ResultScanner rs = null;
            // 获取表
            Table table= null;
            Connection connection = null;
            try {
                connection = pool.getConnection();
                table = getTable(connection, tableName);
                rs = table.getScanner(scan);
                for (Result r : rs) {
                    //每一行数据
                    Map columnMap = new HashMap<>();
                    String rowKey = null;
                    for (Cell cell : r.listCells()) {
                        if(rowKey == null){
                            rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
                        }
                        columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                    if(rowKey != null){
                        result.put(rowKey,columnMap);
                    }
                }
            }catch (IOException e) {
                log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}"
                        ,tableName),e);
            }finally {
                close(null, rs, table, connection);
            }
            return result;
        }
        @Override
        public Map getRowData(String tableName, String rowKey){
            //返回的键值对
            Map result = new HashMap<>();
            Get get = new Get(Bytes.toBytes(rowKey));
            // 获取表
            Table table= null;
            Connection connection = null;
            try {
                connection = pool.getConnection();
                table = getTable(connection, tableName);
                Result hTableResult = table.get(get);
                if (hTableResult != null && !hTableResult.isEmpty()) {
                    for (Cell cell : hTableResult.listCells()) {
                        result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                    // 某些应用场景需要插入到数据库的时间
                    if (hTableResult.listCells().size() > 0) {
                        result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp() + "");
                    }
                }
            }catch (IOException e) {
                log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}"
                        ,tableName,rowKey),e);
            }finally {
                close(null,null, table, connection);
            }
            return result;
        }
        @Override
        public Map getFamilyValue(String tableName, String rowKey, String familyName){
            //返回的键值对
            Map result = new HashMap<>(2);
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addFamily(Bytes.toBytes(familyName));
            // 获取表
            Table table= null;
            Connection connection = null;
            try {
                connection = pool.getConnection();
                table = getTable(connection, tableName);
                Result getResult = table.get(get);
                if (getResult != null && !getResult.isEmpty()) {
                    for (Cell cell : getResult.listCells()) {
                        result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                }
            } catch (IOException e) {
                log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}"
                        , tableName, rowKey, familyName), e);
            }finally {
                close(null,null, table, connection);
            }
            return result;
        }
        @Override
        public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){
            String str = null;
            Get get = new Get(Bytes.toBytes(rowKey));
            // 获取表
            Table table= null;
            Connection connection = null;
            try {
                connection = pool.getConnection();
                table = getTable(connection, tableName);
                Result result = table.get(get);
                if (result != null && !result.isEmpty()) {
                    Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
                    if(cell != null){
                        str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    }
                }
            } catch (IOException e) {
                log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
                        ,tableName,rowKey,familyName,columnName),e);
            }finally {
                close(null,null, table, connection);
            }
            return str;
        }
        private Table getTable(Connection connection, String tableName) throws IOException {
            Table table = connection.getTable(TableName.valueOf(tableName));
            return table;
        }
        private void close(Admin admin, ResultScanner rs, Table table, Connection connection){
            if(admin != null){
                try {
                    admin.close();
                } catch (IOException e) {
                    log.error("关闭Admin失败",e);
                }
            }
            if(rs != null){
                rs.close();
            }
            if(table != null){
                try {
                    table.close();
                } catch (IOException e) {
                    log.error("关闭Table失败",e);
                }
            }
            // 释放连接
            if (Objects.nonNull(connection)) {
                pool.releaseConnection(connection);
            }
        }
    }
    

    ok,现在就可以操作使用了。

    以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)

    这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。

网友评论

搜索
最新文章
热门文章
热门标签