Description
Feature description
TLDR - currently when loading from Spark to Elastic via load balancer, a single timeout can cause the entire Spark task to fail without retries.
On several occasions I've had have to work with architectures where a lot of data is loaded from Spark into Elastic, with all traffic routed through a single load balancer before being forwarded to the various Elastic cluster nodes (usually round robin'd).
This isn't an architecture I'd recommend for various reasons, but organisations sometimes favour it because it allows them to centralise security/audit at the load balancer - and achieve a lot of stuff "for free" with something like istio if they are using kubernetes.
With this architecture, the load config looks like this:
es.nodes = <load balancer>
es.nodes.discovery = false
es.nodes.wan.only = true
One thing I often see when loading Spark with this architecture is that the logic in NetworkClient.java
for handling timeouts and other exceptions can cause instability. The logic appears to me to have two failure scenarios:
- HTTP response received but, but the elastic response indicates that some or all of the docs weren't indexed. Only the un-indexed docs are retried, based on
es.batch.write.retry.count
- Exception case - usually this occurs when the HTTP response is not received before the configured
es.http.timeout
-> try next available node (or if none are left re-throw the timeout as an "all nodes failed" exception)
The issue appears to be that the NetworkClient is only aware of a single node, the load balancer. In case 2 when an exception occurs - the entire Spark task fails like this:
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[<load-balancer>]]
This is because there is no "next node" for it to retry the timed out request on.
The Spark task can and will be restarted up until spark.task.maxFailures
is hit, but this restart results in any data already loaded within the task being reloaded (something that does not happen in failure scenario 1). This can have a big impact especially when Spark tasks are quite large, and also causes unnecessary deletes by overwriting existing documents. These deletes need to be expunged later.
I was wondering if it might be worth adding some configurable number of retries for exception case as well as the "incomplete response" case? I'd be happy to raise a PR for this, but wanted to get an opinion first and make sure I'm on the right track! I suppose this might look something like:
<cycle through available nodes one by one - if you reach the end of the list go back to the start N times>
where N is the new configuration setting.