上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Java连接ES的多种方式

guduadmin27月前

前言

本篇文章主要介绍:使用Basic、Transport、Kerberos三种方式连接ES

Bisic方式

/**
 * Basic方式(带用户名和密码方式访问)
 * @param user 用户名
 * @param password 密码
 * @param index 索引名
 */
private static void basicConnect(String user, String password,String index) {
    try {
        System.out.println("Basic访问start........................");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(user, password));
        System.out.println("用户为:"+user+" ,密码为:"+password);
        client = new RestHighLevelClient(
                RestClient.builder(new HttpHost(PropertiesUtil.esHost, PropertiesUtil.esPort))
                        .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                            @Override
                            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                //httpClientBuilder.disableAuthCaching();
                                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                        }));
    } catch (Exception e) {
        e.printStackTrace();
    }
    //访问所有索引
    PropertiesUtil.getIndics(client);
    //访问指定索引
    PropertiesUtil.getIndex(client,index);
    System.out.println("Basic访问end........................");
}

Transport方式

/**
 * 
 * 使用transport方法访问ES
 * @param index 索引名
 */
public static void transportConnect(String index){
    try {
        System.out.println("transport访问start........................");
        TransportClient client=new PreBuiltTransportClient(getESSetting())
                .addTransportAddress(new TransportAddress(InetAddress.getByName(PropertiesUtil.esHost),9300));
        System.out.println("Basic "+Base64.getEncoder().encodeToString(
                ("admin" + ":" + Objects.requireNonNull("admin")).getBytes(StandardCharsets.UTF_8)));
        client.threadPool().getThreadContext().putHeader("Authorization", "Basic "+Base64.getEncoder().encodeToString(
                ("admin" + ":" + Objects.requireNonNull("admin")).getBytes(StandardCharsets.UTF_8)));
        System.out.println("获取client成功........................");
        System.out.println("尝试创建index:" + index);
        if (!PropertiesUtil.isExistIndex(client,index)) {
            CreateIndexResponse cir = client.admin().indices().create(new CreateIndexRequest(index)).actionGet();
            System.out.println(cir.isAcknowledged());
        } else {
            System.out.println(index + "已存在");
        }
        System.out.println("transport访问end........................");
    } catch (Exception e) {
        e.printStackTrace();
    }
}
/**
 * 设置setting
 * @return 返回setting
 */
public static Settings getESSetting() {
    return Settings.builder()
            .put("cluster.name",PropertiesUtil.clusterName)
            .build();
}

Kerberos方式

/**
 * 认证kerberos访问es
 * @param index 索引名
 */
public static void kerberosConnect(String index) {
    RestHighLevelClient restHighLevelClient = null;
    try {
        System.out.println("kerberos访问start........................");
        System.out.println("开始认证........................");
        SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(PropertiesUtil.principal,
                PropertiesUtil.ketTabPath, true);
        System.out.println("认证成功........................");
        //业务逻辑开始
        List hosts = new ArrayList<>();
        HttpHost hostNew = new HttpHost(PropertiesUtil.esHost, PropertiesUtil.esPort, "http");
        hosts.add(hostNew);
        HttpHost[] httpHosts = hosts.toArray(new HttpHost[0]);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        restClientBuilder.setHttpClientConfigCallback(callbackHandler);
        restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        //测试获取所有的索引
        System.out.println("获取所有的索引........................");
        PropertiesUtil.getIndics(restHighLevelClient);
        System.out.println("获取" + index + "索引数据........................");
        PropertiesUtil.getIndex(restHighLevelClient,index);
        System.out.println("kerberos访问end.........................................");
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        if(restHighLevelClient !=null){
            try {
                restHighLevelClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

正常访问

/**
 * 正常访问ES(不带认证信息)
 * @param index 索引名
 */
private static void rnormalConnect(String index){
    try {
        System.out.println("正常访问start.........................................");
        List hosts = new ArrayList<>();
        HttpHost hostNew = new HttpHost(PropertiesUtil.esHost, PropertiesUtil.esPort, "http");
        hosts.add(hostNew);
        HttpHost[] httpHosts = hosts.toArray(new HttpHost[0]);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        Header[] defaultHeaders = new Header[]{
                new BasicHeader("Accept", "application/json"),
                new BasicHeader("Content-type", "application/json")};
        restClientBuilder.setDefaultHeaders(defaultHeaders);
        client = new RestHighLevelClient(restClientBuilder);
        PropertiesUtil.getIndics(client);
        PropertiesUtil.getIndex(client,index);
        System.out.println("正常访问end.........................................");
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        closeRes();
    }
}

综合测试

  • EsConnectDemo

    public class EsConnectDemo {
        private static RestHighLevelClient client;
        public static void main(String[] args) {
            try {
                //测试kerberos是否可以认证
                kerberos();
                if (args.length != 1) {
                    System.out.println("请输入要测试的索引名称");
                }
                String index = args[0];
                //ES 功能验证
                busisses(index);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        /**
         * 验证当前kerberos是否可以认证
         */
        public static void kerberos() {
            try {
                System.setProperty("http.auth.preference","Kerberos");
                System.setProperty("java.security.krb5.conf",PropertiesUtil.krb5ConfPath);
                System.setProperty("sun.security.krb5.debug", "false");
                System.setProperty("sun.security.spnego.debug", "false");
                String acceptorPrincipal = PropertiesUtil.principal;
                Path acceptorKeyTabPath = Paths.get(PropertiesUtil.ketTabPath);
                Set set = new HashSet<>();
                set.add(acceptorPrincipal);
                final Subject subject = JaasKrbUtil.loginUsingKeytab(set, acceptorKeyTabPath, true);
                Set privateCredentials = subject.getPrivateCredentials();
                System.out.println("getPrivateCredentials:----------------------");
                privateCredentials.forEach(System.out::println);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        /**
         * 测试ES权限
         * 每次测试时,会读取es的全部索引,和指定索引数据
         * @param index 读取的索引名称
         */
        private static void busisses(String index) {
            try {
                /* --- transPort 方式访问 --- */
                transportConnect(index);
                
                /* --- rest 方式访问 --- */
                //正常访问
                normalConnect(index);
                //Basic 管理员访问
                basicConnect("admin","admin",index);
                //Basic 超级用户访问
                basicConnect("super","super",index);
                //Basic 普通用户访问
                basicConnect("normal","normal",index);
                //kerberos访问
                kerberosConnect(index);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                closeRes();
            }
        }
        /**
         * 关闭连接
         */
        public static void closeRes(){
            if(client != null){
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    } 
     

    配置文件

    • kerberos.properties
      principal=example@HADOOP.COM
      keytabPath=/etc/keytabs/es.service.keytab
      krb5ConfPath=/etc/krb5.conf
      esHost=hostname
      esPort=9200
      clusterName=dkjhl

      文中提到的工具类

      • PropertiesUtil
        import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
        import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
        import org.elasticsearch.action.search.SearchRequest;
        import org.elasticsearch.action.search.SearchResponse;
        import org.elasticsearch.client.GetAliasesResponse;
        import org.elasticsearch.client.RequestOptions;
        import org.elasticsearch.client.RestHighLevelClient;
        import org.elasticsearch.client.transport.TransportClient;
        import org.elasticsearch.cluster.metadata.AliasMetaData;
        import org.elasticsearch.search.SearchHit;
        import org.elasticsearch.search.builder.SearchSourceBuilder;
        import java.io.FileInputStream;
        import java.io.InputStream;
        import java.util.Map;
        import java.util.Properties;
        import java.util.Set;
        public class PropertiesUtil {
            public static Properties prop = new Properties();
            static {
                try (InputStream is = new FileInputStream("kerberos.properties")){
                    prop.load(is);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("找不到kerberos.properties文件!");
                }
            }
            public static String ketTabPath = prop.getProperty("keytabPath");
            public static String krb5ConfPath = prop.getProperty("krb5ConfPath");
            public static String principal = prop.getProperty("principal");
            public static String esHost = prop.getProperty("esHost");
            public static int esPort = Integer.parseInt(prop.getProperty("esPort"));
            public static String clusterName = prop.getProperty("clusterName");
            /**
             * 判断index是否存在
             * @param client 客户端
             * @param indexName 索引名字
             * @return
             */
            public static boolean isExistIndex(TransportClient client, String indexName) {
                boolean isExistIndex = false;
                try {
                    isExistIndex = client.admin().indices()
                            .exists(new IndicesExistsRequest().indices(indexName))
                            .actionGet().isExists();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return isExistIndex;
            }
            /**
             * 获取所有索引
             * @param client 客户端
             */
            public static void getIndics(RestHighLevelClient client){
                try {
                    GetAliasesRequest request = new GetAliasesRequest();
                    GetAliasesResponse getAliasesResponse =  client.indices().getAlias(request, RequestOptions.DEFAULT);
                    Map map = getAliasesResponse.getAliases();
                    Set indices = map.keySet();
                    for (String key : indices) {
                        System.out.println(key);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            /**
             * 获取节点列表
             * @param client 客户端
             */
            public static void getIndex(RestHighLevelClient client, String index){
                try {
                    SearchRequest searchRequest = new SearchRequest(index);
                    SearchSourceBuilder builder = new SearchSourceBuilder();
                    searchRequest.source(builder);
                    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                    for (SearchHit hit : searchResponse.getHits().getHits()) {
                        System.out.println(hit);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        • JaasKrbUtil
          //Source: Apache Kerby project
          //https://directory.apache.org/kerby/
          import javax.security.auth.Subject;
          import javax.security.auth.callback.Callback;
          import javax.security.auth.callback.CallbackHandler;
          import javax.security.auth.callback.PasswordCallback;
          import javax.security.auth.callback.UnsupportedCallbackException;
          import javax.security.auth.kerberos.KerberosPrincipal;
          import javax.security.auth.login.AppConfigurationEntry;
          import javax.security.auth.login.Configuration;
          import javax.security.auth.login.LoginContext;
          import javax.security.auth.login.LoginException;
          import java.io.IOException;
          import java.nio.file.Path;
          import java.security.Principal;
          import java.util.HashMap;
          import java.util.HashSet;
          import java.util.Map;
          import java.util.Set;
          /**
          * JAAS utilities for Kerberos login.
          */
          public final class JaasKrbUtil {
           private static boolean debug = false;
           private JaasKrbUtil() {
           }
           public static void setDebug(final boolean debuger) {
               debug = debuger;
           }
           public static Subject loginUsingPassword(final String principal, final String password) throws LoginException {
               final Set principals = new HashSet();
               principals.add(new KerberosPrincipal(principal));
               final Subject subject = new Subject(false, principals, new HashSet(), new HashSet());
               final Configuration conf = usePassword(principal);
               final String confName = "PasswordConf";
               final CallbackHandler callback = new KrbCallbackHandler(principal, password);
               final LoginContext loginContext = new LoginContext(confName, subject, callback, conf);
               loginContext.login();
               return loginContext.getSubject();
           }
           public static Subject loginUsingTicketCache(final String principal, final Path cachePath) throws LoginException {
               final Set principals = new HashSet();
               principals.add(new KerberosPrincipal(principal));
               final Subject subject = new Subject(false, principals, new HashSet(), new HashSet());
               final Configuration conf = useTicketCache(principal, cachePath);
               final String confName = "TicketCacheConf";
               final LoginContext loginContext = new LoginContext(confName, subject, null, conf);
               loginContext.login();
               return loginContext.getSubject();
           }
           public static Subject loginUsingKeytab(final Set principalAsStrings, final Path keytabPath, final boolean initiator) throws LoginException {
               final Set principals = new HashSet();
               String a = "";
               for(String p: principalAsStrings) {
                   a = p;
                   principals.add(new KerberosPrincipal(p));
               }
               final Subject subject = new Subject(false, principals, new HashSet(), new HashSet());
               final Configuration conf = useKeytab(a, keytabPath, initiator);
               final String confName = "KeytabConf";
               final LoginContext loginContext = new LoginContext(confName, subject, null, conf);
               loginContext.login();
               return loginContext.getSubject();
           }
           public static Configuration usePassword(final String principal) {
               return new PasswordJaasConf(principal);
           }
           public static Configuration useTicketCache(final String principal, final Path credentialPath) {
               return new TicketCacheJaasConf(principal, credentialPath);
           }
           public static Configuration useKeytab(final String principal, final Path keytabPath, final boolean initiator) {
               return new KeytabJaasConf(principal, keytabPath, initiator);
           }
           private static String getKrb5LoginModuleName() {
               return System.getProperty("java.vendor").contains("IBM") ? "com.ibm.security.auth.module.Krb5LoginModule"
                       : "com.sun.security.auth.module.Krb5LoginModule";
           }
           static class KeytabJaasConf extends Configuration {
               private final String principal;
               private final Path keytabPath;
               private final boolean initiator;
               public KeytabJaasConf(final String principal, final Path keytab, final boolean initiator) {
                   this.principal = principal;
                   this.keytabPath = keytab;
                   this.initiator = initiator;
               }
               @Override
               public AppConfigurationEntry[] getAppConfigurationEntry(final String name) {
                   final Map options = new HashMap();
                   options.put("keyTab", keytabPath.toAbsolutePath().toString());
                   options.put("principal", principal);
                   options.put("useKeyTab", "true");
                   options.put("storeKey", "true");
                   options.put("doNotPrompt", "true");
                   options.put("renewTGT", "false");
                   options.put("refreshKrb5Config", "true");
                   options.put("isInitiator", String.valueOf(initiator));
                   options.put("debug", String.valueOf(debug));
                   return new AppConfigurationEntry[] { new AppConfigurationEntry(getKrb5LoginModuleName(),
                           AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options) };
               }
           }
           static class TicketCacheJaasConf extends Configuration {
               private final String principal;
               private final Path clientCredentialPath;
               public TicketCacheJaasConf(final String principal, final Path clientCredentialPath) {
                   this.principal = principal;
                   this.clientCredentialPath = clientCredentialPath;
               }
               @Override
               public AppConfigurationEntry[] getAppConfigurationEntry(final String name) {
                   final Map options = new HashMap();
                   options.put("principal", principal);
                   options.put("storeKey", "false");
                   options.put("doNotPrompt", "false");
                   options.put("useTicketCache", "true");
                   options.put("renewTGT", "true");
                   options.put("refreshKrb5Config", "true");
                   options.put("isInitiator", "true");
                   options.put("ticketCache", clientCredentialPath.toAbsolutePath().toString());
                   options.put("debug", String.valueOf(debug));
                   return new AppConfigurationEntry[] { new AppConfigurationEntry(getKrb5LoginModuleName(),
                           AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options) };
               }
           }
           static class PasswordJaasConf extends Configuration {
               private final String principal;
               public PasswordJaasConf(final String principal) {
                   this.principal = principal;
               }
               @Override
               public AppConfigurationEntry[] getAppConfigurationEntry(final String name) {
                   final Map options = new HashMap<>();
                   options.put("principal", principal);
                   options.put("storeKey", "true");
                   options.put("useTicketCache", "true");
                   options.put("useKeyTab", "false");
                   options.put("renewTGT", "true");
                   options.put("refreshKrb5Config", "true");
                   options.put("isInitiator", "true");
                   options.put("debug", String.valueOf(debug));
                   return new AppConfigurationEntry[] { new AppConfigurationEntry(getKrb5LoginModuleName(),
                           AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options) };
               }
           }
           public static class KrbCallbackHandler implements CallbackHandler {
               private final String principal;
               private final String password;
               public KrbCallbackHandler(final String principal, final String password) {
                   this.principal = principal;
                   this.password = password;
               }
               @Override
               public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException {
                   for (int i = 0; i < callbacks.length; i++) {
                       if (callbacks[i] instanceof PasswordCallback) {
                           final PasswordCallback pc = (PasswordCallback) callbacks[i];
                           if (pc.getPrompt().contains(principal)) {
                               pc.setPassword(password.toCharArray());
                               break;
                           }
                       }
                   }
               }
           }
          }