前言
本篇文章主要介绍:使用Basic、Transport、Kerberos三种方式连接ES
Bisic方式
/** * Basic方式(带用户名和密码方式访问) * @param user 用户名 * @param password 密码 * @param index 索引名 */ private static void basicConnect(String user, String password,String index) { try { System.out.println("Basic访问start........................"); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(user, password)); System.out.println("用户为:"+user+" ,密码为:"+password); client = new RestHighLevelClient( RestClient.builder(new HttpHost(PropertiesUtil.esHost, PropertiesUtil.esPort)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { //httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } })); } catch (Exception e) { e.printStackTrace(); } //访问所有索引 PropertiesUtil.getIndics(client); //访问指定索引 PropertiesUtil.getIndex(client,index); System.out.println("Basic访问end........................"); }
Transport方式
/** * * 使用transport方法访问ES * @param index 索引名 */ public static void transportConnect(String index){ try { System.out.println("transport访问start........................"); TransportClient client=new PreBuiltTransportClient(getESSetting()) .addTransportAddress(new TransportAddress(InetAddress.getByName(PropertiesUtil.esHost),9300)); System.out.println("Basic "+Base64.getEncoder().encodeToString( ("admin" + ":" + Objects.requireNonNull("admin")).getBytes(StandardCharsets.UTF_8))); client.threadPool().getThreadContext().putHeader("Authorization", "Basic "+Base64.getEncoder().encodeToString( ("admin" + ":" + Objects.requireNonNull("admin")).getBytes(StandardCharsets.UTF_8))); System.out.println("获取client成功........................"); System.out.println("尝试创建index:" + index); if (!PropertiesUtil.isExistIndex(client,index)) { CreateIndexResponse cir = client.admin().indices().create(new CreateIndexRequest(index)).actionGet(); System.out.println(cir.isAcknowledged()); } else { System.out.println(index + "已存在"); } System.out.println("transport访问end........................"); } catch (Exception e) { e.printStackTrace(); } } /** * 设置setting * @return 返回setting */ public static Settings getESSetting() { return Settings.builder() .put("cluster.name",PropertiesUtil.clusterName) .build(); }
Kerberos方式
/** * 认证kerberos访问es * @param index 索引名 */ public static void kerberosConnect(String index) { RestHighLevelClient restHighLevelClient = null; try { System.out.println("kerberos访问start........................"); System.out.println("开始认证........................"); SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(PropertiesUtil.principal, PropertiesUtil.ketTabPath, true); System.out.println("认证成功........................"); //业务逻辑开始 Listhosts = new ArrayList<>(); HttpHost hostNew = new HttpHost(PropertiesUtil.esHost, PropertiesUtil.esPort, "http"); hosts.add(hostNew); HttpHost[] httpHosts = hosts.toArray(new HttpHost[0]); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); restClientBuilder.setHttpClientConfigCallback(callbackHandler); restHighLevelClient = new RestHighLevelClient(restClientBuilder); //测试获取所有的索引 System.out.println("获取所有的索引........................"); PropertiesUtil.getIndics(restHighLevelClient); System.out.println("获取" + index + "索引数据........................"); PropertiesUtil.getIndex(restHighLevelClient,index); System.out.println("kerberos访问end........................................."); } catch (Exception e) { e.printStackTrace(); }finally { if(restHighLevelClient !=null){ try { restHighLevelClient.close(); } catch (IOException e) { e.printStackTrace(); } } } }
正常访问
/** * 正常访问ES(不带认证信息) * @param index 索引名 */ private static void rnormalConnect(String index){ try { System.out.println("正常访问start........................................."); Listhosts = new ArrayList<>(); HttpHost hostNew = new HttpHost(PropertiesUtil.esHost, PropertiesUtil.esPort, "http"); hosts.add(hostNew); HttpHost[] httpHosts = hosts.toArray(new HttpHost[0]); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); Header[] defaultHeaders = new Header[]{ new BasicHeader("Accept", "application/json"), new BasicHeader("Content-type", "application/json")}; restClientBuilder.setDefaultHeaders(defaultHeaders); client = new RestHighLevelClient(restClientBuilder); PropertiesUtil.getIndics(client); PropertiesUtil.getIndex(client,index); System.out.println("正常访问end........................................."); } catch (Exception e) { e.printStackTrace(); }finally { closeRes(); } }
综合测试
EsConnectDemo
public class EsConnectDemo { private static RestHighLevelClient client; public static void main(String[] args) { try { //测试kerberos是否可以认证 kerberos(); if (args.length != 1) { System.out.println("请输入要测试的索引名称"); } String index = args[0]; //ES 功能验证 busisses(index); } catch (Exception e) { e.printStackTrace(); } } /** * 验证当前kerberos是否可以认证 */ public static void kerberos() { try { System.setProperty("http.auth.preference","Kerberos"); System.setProperty("java.security.krb5.conf",PropertiesUtil.krb5ConfPath); System.setProperty("sun.security.krb5.debug", "false"); System.setProperty("sun.security.spnego.debug", "false"); String acceptorPrincipal = PropertiesUtil.principal; Path acceptorKeyTabPath = Paths.get(PropertiesUtil.ketTabPath); Set
set = new HashSet<>(); set.add(acceptorPrincipal); final Subject subject = JaasKrbUtil.loginUsingKeytab(set, acceptorKeyTabPath, true); Set 配置文件
kerberos.properties
principal=example@HADOOP.COM keytabPath=/etc/keytabs/es.service.keytab krb5ConfPath=/etc/krb5.conf esHost=hostname esPort=9200 clusterName=dkjhl
文中提到的工具类
PropertiesUtil
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.GetAliasesResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.FileInputStream; import java.io.InputStream; import java.util.Map; import java.util.Properties; import java.util.Set; public class PropertiesUtil { public static Properties prop = new Properties(); static { try (InputStream is = new FileInputStream("kerberos.properties")){ prop.load(is); } catch (Exception e) { e.printStackTrace(); System.out.println("找不到kerberos.properties文件!"); } } public static String ketTabPath = prop.getProperty("keytabPath"); public static String krb5ConfPath = prop.getProperty("krb5ConfPath"); public static String principal = prop.getProperty("principal"); public static String esHost = prop.getProperty("esHost"); public static int esPort = Integer.parseInt(prop.getProperty("esPort")); public static String clusterName = prop.getProperty("clusterName"); /** * 判断index是否存在 * @param client 客户端 * @param indexName 索引名字 * @return */ public static boolean isExistIndex(TransportClient client, String indexName) { boolean isExistIndex = false; try { isExistIndex = client.admin().indices() .exists(new IndicesExistsRequest().indices(indexName)) .actionGet().isExists(); } catch (Exception e) { e.printStackTrace(); } return isExistIndex; } /** * 获取所有索引 * @param client 客户端 */ public static void getIndics(RestHighLevelClient client){ try { GetAliasesRequest request = new GetAliasesRequest(); GetAliasesResponse getAliasesResponse = client.indices().getAlias(request, RequestOptions.DEFAULT); Map
map = getAliasesResponse.getAliases(); Set indices = map.keySet(); for (String key : indices) { System.out.println(key); } } catch (Exception e) { e.printStackTrace(); } } /** * 获取节点列表 * @param client 客户端 */ public static void getIndex(RestHighLevelClient client, String index){ try { SearchRequest searchRequest = new SearchRequest(index); SearchSourceBuilder builder = new SearchSourceBuilder(); searchRequest.source(builder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); for (SearchHit hit : searchResponse.getHits().getHits()) { System.out.println(hit); } } catch (Exception e) { e.printStackTrace(); } } } JaasKrbUtil
//Source: Apache Kerby project //https://directory.apache.org/kerby/ import javax.security.auth.Subject; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; import java.io.IOException; import java.nio.file.Path; import java.security.Principal; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * JAAS utilities for Kerberos login. */ public final class JaasKrbUtil { private static boolean debug = false; private JaasKrbUtil() { } public static void setDebug(final boolean debuger) { debug = debuger; } public static Subject loginUsingPassword(final String principal, final String password) throws LoginException { final Set
principals = new HashSet (); principals.add(new KerberosPrincipal(principal)); final Subject subject = new Subject(false, principals, new HashSet SpnegoHttpClientConfigCallbackHandler
import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; import org.apache.http.auth.KerberosCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.AuthSchemes; import org.apache.http.config.Lookup; import org.apache.http.config.RegistryBuilder; import org.apache.http.impl.auth.SPNegoSchemeFactory; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.elasticsearch.common.settings.SecureString; import org.ietf.jgss.*; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import java.io.IOException; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class SpnegoHttpClientConfigCallbackHandler implements HttpClientConfigCallback { private static final String SUN_KRB5_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule"; private static final String CRED_CONF_NAME = "ESClientLoginConf"; private static final Oid SPNEGO_OID = getSpnegoOid(); private static Oid getSpnegoOid() { Oid oid = null; try { oid = new Oid("1.3.6.1.5.5.2"); } catch (GSSException gsse) { throw ExceptionsHelper.convertToRuntime(gsse); } return oid; } private final String userPrincipalName; private final SecureString password; private final String keytabPath; private final boolean enableDebugLogs; private LoginContext loginContext; /** * principalName and password. * * @param userPrincipalName user principal name * @param password password for user * @param enableDebugLogs if {@code true} enables kerberos debug logs */ public SpnegoHttpClientConfigCallbackHandler(final String userPrincipalName, final SecureString password, final boolean enableDebugLogs) { this.userPrincipalName = userPrincipalName; this.password = password; this.keytabPath = null; this.enableDebugLogs = enableDebugLogs; } /** * principalName and keytab. * * @param userPrincipalName User principal name * @param keytabPath path to keytab file for user * @param enableDebugLogs if {@code true} enables kerberos debug logs */ public SpnegoHttpClientConfigCallbackHandler(final String userPrincipalName, final String keytabPath, final boolean enableDebugLogs) { this.userPrincipalName = userPrincipalName; this.keytabPath = keytabPath; this.password = null; this.enableDebugLogs = enableDebugLogs; } @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { setupSpnegoAuthSchemeSupport(httpClientBuilder); return httpClientBuilder; } private void setupSpnegoAuthSchemeSupport(HttpAsyncClientBuilder httpClientBuilder) { final Lookup authSchemeRegistry = RegistryBuilder.create() .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build(); final GSSManager gssManager = GSSManager.getInstance(); try { final GSSName gssUserPrincipalName = gssManager.createName(userPrincipalName, GSSName.NT_USER_NAME); login(); final AccessControlContext acc = AccessController.getContext(); final GSSCredential credential = doAsPrivilegedWrapper(loginContext.getSubject(), (PrivilegedExceptionAction
) () -> gssManager.createCredential(gssUserPrincipalName, GSSCredential.DEFAULT_LIFETIME, SPNEGO_OID, GSSCredential.INITIATE_ONLY), acc); final KerberosCredentialsProvider credentialsProvider = new KerberosCredentialsProvider(); credentialsProvider.setCredentials( new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, AuthSchemes.SPNEGO), new KerberosCredentials(credential)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } catch (GSSException e) { throw new RuntimeException(e); } catch (PrivilegedActionException e) { throw new RuntimeException(e.getCause()); } /** 2022-08-16 update options : java.io.IOException:listener timeout after waiting for [xxxxx] ms start **/ // httpClientBuilder.setKeepAliveStrategy((httpResponse,httpContext) -> Duration.ofMinutes(5).toMillis()); // 时间的不同设置 // httpClientBuilder.setKeepAliveStrategy((httpResponse,httpContext) -> TimeUnit.MINUTES.toMillis(3)); // httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()); /** 2022-08-16 update options : java.io.IOException:listener timeout after waiting for [xxxxx] ms end **/ httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry); } /** * If logged in {@link LoginContext} is not available, it attempts login and * returns {@link LoginContext} * * @return {@link LoginContext} * @throws PrivilegedActionException */ public synchronized LoginContext login() throws PrivilegedActionException { if (this.loginContext == null) { AccessController.doPrivileged((PrivilegedExceptionAction ) () -> { final Subject subject = new Subject(false, Collections.singleton(new KerberosPrincipal(userPrincipalName)), Collections.emptySet(), Collections.emptySet()); Configuration conf = null; final CallbackHandler callback; if (password != null) { conf = new PasswordJaasConf(userPrincipalName, enableDebugLogs); callback = new KrbCallbackHandler(userPrincipalName, password); } else { conf = new KeytabJaasConf(userPrincipalName, keytabPath, enableDebugLogs); callback = null; } loginContext = new LoginContext(CRED_CONF_NAME, subject, callback, conf); loginContext.login(); return null; }); } return loginContext; } /** * Privileged Wrapper that invokes action with Subject.doAs to perform work as * given subject. * * @param subject {@link Subject} to be used for this work * @param action {@link PrivilegedExceptionAction} action for performing inside * Subject.doAs * @param acc the {@link AccessControlContext} to be tied to the specified * subject and action see * {@link Subject#doAsPrivileged(Subject, PrivilegedExceptionAction, AccessControlContext) * @return the value returned by the PrivilegedExceptionAction's run method * @throws PrivilegedActionException */ static T doAsPrivilegedWrapper(final Subject subject, final PrivilegedExceptionAction action, final AccessControlContext acc) throws PrivilegedActionException { try { return AccessController.doPrivileged((PrivilegedExceptionAction ) () -> Subject.doAsPrivileged(subject, action, acc)); } catch (PrivilegedActionException pae) { if (pae.getCause() instanceof PrivilegedActionException) { throw (PrivilegedActionException) pae.getCause(); } throw pae; } } /** * This class matches {@link AuthScope} and based on that returns * {@link Credentials}. Only supports {@link AuthSchemes#SPNEGO} in * {@link AuthScope#getScheme()} */ private static class KerberosCredentialsProvider implements CredentialsProvider { private AuthScope authScope; private Credentials credentials; @Override public void setCredentials(AuthScope authscope, Credentials credentials) { if (authscope.getScheme().regionMatches(true, 0, AuthSchemes.SPNEGO, 0, AuthSchemes.SPNEGO.length()) == false) { throw new IllegalArgumentException("Only " + AuthSchemes.SPNEGO + " auth scheme is supported in AuthScope"); } this.authScope = authscope; this.credentials = credentials; } @Override public Credentials getCredentials(AuthScope authscope) { assert this.authScope != null && authscope != null; return authscope.match(this.authScope) > -1 ? this.credentials : null; } @Override public void clear() { this.authScope = null; this.credentials = null; } } /** * Jaas call back handler to provide credentials. */ private static class KrbCallbackHandler implements CallbackHandler { private final String principal; private final SecureString password; KrbCallbackHandler(final String principal, final SecureString password) { this.principal = principal; this.password = password; } public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException { for (Callback callback : callbacks) { if (callback instanceof PasswordCallback) { PasswordCallback pc = (PasswordCallback) callback; if (pc.getPrompt().contains(principal)) { pc.setPassword(password.getChars()); break; } } } } } /** * Usually we would have a JAAS configuration file for login configuration. * Instead of an additional file setting as we do not want the options to be * customizable we are constructing it in memory. * * As we are using this instead of jaas.conf, this requires refresh of * {@link Configuration} and reqires appropriate security permissions to do so. */ private static class PasswordJaasConf extends AbstractJaasConf { PasswordJaasConf(final String userPrincipalName, final boolean enableDebugLogs) { super(userPrincipalName, enableDebugLogs); } public void addOptions(final Map
options) { options.put("useTicketCache", Boolean.FALSE.toString()); options.put("useKeyTab", Boolean.FALSE.toString()); } } /** * Usually we would have a JAAS configuration file for login configuration. As * we have static configuration except debug flag, we are constructing in * memory. This avoids additional configuration required from the user. * * As we are using this instead of jaas.conf, this requires refresh of * {@link Configuration} and requires appropriate security permissions to do so. */ private static class KeytabJaasConf extends AbstractJaasConf { private final String keytabFilePath; KeytabJaasConf(final String userPrincipalName, final String keytabFilePath, final boolean enableDebugLogs) { super(userPrincipalName, enableDebugLogs); this.keytabFilePath = keytabFilePath; } public void addOptions(final Map
options) { options.put("useKeyTab", Boolean.TRUE.toString()); options.put("keyTab", keytabFilePath); options.put("doNotPrompt", Boolean.TRUE.toString()); } } private abstract static class AbstractJaasConf extends Configuration { private final String userPrincipalName; private final boolean enableDebugLogs; AbstractJaasConf(final String userPrincipalName, final boolean enableDebugLogs) { this.userPrincipalName = userPrincipalName; this.enableDebugLogs = enableDebugLogs; } @Override public AppConfigurationEntry[] getAppConfigurationEntry(final String name) { final Map options = new HashMap<>(); options.put("principal", userPrincipalName); options.put("isInitiator", Boolean.TRUE.toString()); options.put("storeKey", Boolean.TRUE.toString()); options.put("debug", Boolean.toString(enableDebugLogs)); addOptions(options); return new AppConfigurationEntry[] { new AppConfigurationEntry(SUN_KRB5_LOGIN_MODULE, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.unmodifiableMap(options)) }; } abstract void addOptions(Map options); } }
猜你喜欢
- 14小时前linux搭建LAMP服务
- 14小时前kafka基础知识总结
- 14小时前计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解
- 14小时前打败一切NeRF! 3D Gaussian Splatting 的 简单入门知识
- 14小时前3D Gaussian Splatting:用于实时的辐射场渲染
- 11小时前准备好了吗英文(准备好了吗英文咋说)
- 11小时前你是我的优乐美(你是我的优乐美是什么歌)
- 6小时前aabc的词语有哪些大全(aabc式的词语有什么?)
- 5小时前隔离开关是指承担接通和断开电流任务(隔离开关指承担接通和断开电流任务,将电路与电源断开)
- 4小时前xp3用什么模拟器打开(xp模拟器怎么用)
网友评论
- 搜索
- 最新文章
- 热门文章