I'm migrating from the HLRC to the new client, things were smooth but for some reason I cannot index a specific class/document. Here is my client implementation and index request:
@Configuration
public class ClientConfiguration{
@Autowired
private InternalProperties conf;
public ElasticsearchClient sslClient(){
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(conf.getElasticsearchUser(), conf.getElasticsearchPassword()));
HttpHost httpHost = new HttpHost(conf.getElasticsearchAddress(), conf.getElasticsearchPort(), "https");
RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
try {
SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, (x509Certificates, s) -> true).build();
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
} catch (Exception e) {
e.printStackTrace();
}
RestClient restClient=restClientBuilder.build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
return client;
}
}
@Service
public class ThisDtoIndexClass extends ConfigAndProperties{
public ThisDtoIndexClass() {
}
//client is declared in the class it's extending from
public ThisDtoIndexClass(@Autowired ClientConfiguration esClient) {
this.client = esClient.sslClient();
}
@KafkaListener(topics = "esTopic")
public void in(@Payload(required = false) customDto doc)
throws ThisDtoIndexClassException, ElasticsearchException, IOException {
if(doc!= null && doc.getId() != null) {
IndexRequest.Builder<customDto > indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("index-for-this-Dto");
indexReqBuilder.id(doc.getId());
indexReqBuilder.document(doc);
IndexResponse response = client.index(indexReqBuilder.build());
} else {
throw new ThisDtoIndexClassException("document is null");
}
}
}
This is all done in spring boot (v2.6.8) with ES 7.17.3. According to the debug, the payload is NOT null! It even fetches the id correctly while stepping through. For some reason, it throws me a org.springframework.kafka.listener.ListenerExecutionFailedException: in the last line (during the .build?). Nothing gets indexed, but the response comes back 200. I'm lost on where I should be looking. I have a different class that also writes to a different index, also getting a payload from kafka directly (all seperate consumers). That one functions just fine.
I suspect it has something to do with the way my client is set up and/or the kafka. Please point me in the right direction.