Skip to content

[ML] GetDatafeedRunningStateAction can fail if multiple local datafeed tasks exist #104160

@droberts195

Description

@droberts195

The following exception trace was observed in a serverless project:

java.lang.IllegalStateException: Duplicate key datafeed-v3_linux_system_user_discovery (attempted merging values org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction$Response$RunningState@7d2c9ddc and org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction$Response$RunningState@131739)
	at java.base/java.util.stream.Collectors.duplicateKeyException(Collectors.java:135)
	at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:182)
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
	at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
	at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at org.elasticsearch.xcore@8.13.0/org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction$Response.fromResponses(GetDatafeedRunningStateAction.java:154)
	at org.elasticsearch.ml@8.13.0/org.elasticsearch.xpack.ml.action.TransportGetDatafeedRunningStateAction.newResponse(TransportGetDatafeedRunningStateAction.java:77)
	at org.elasticsearch.ml@8.13.0/org.elasticsearch.xpack.ml.action.TransportGetDatafeedRunningStateAction.newResponse(TransportGetDatafeedRunningStateAction.java:38)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.tasks.TransportTasksAction$1.onCompletion(TransportTasksAction.java:146)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.tasks.TransportTasksAction$1.onCompletion(TransportTasksAction.java:91)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:268)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.CancellableFanOut.lambda$run$0(CancellableFanOut.java:63)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.CancellableFanOut$SubtasksCompletionHandler.run(CancellableFanOut.java:197)
	at org.elasticsearch.base@8.13.0/org.elasticsearch.core.AbstractRefCounted$1.closeInternal(AbstractRefCounted.java:118)
	at org.elasticsearch.base@8.13.0/org.elasticsearch.core.AbstractRefCounted.decRef(AbstractRefCounted.java:70)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.RefCountingRunnable.close(RefCountingRunnable.java:112)
	at org.elasticsearch.base@8.13.0/org.elasticsearch.core.IOUtils.close(IOUtils.java:71)
	at org.elasticsearch.base@8.13.0/org.elasticsearch.core.Releasables.close(Releasables.java:34)
	at org.elasticsearch.base@8.13.0/org.elasticsearch.core.Releasables.closeExpectNoException(Releasables.java:58)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListenerImplementations$2.run(ActionListenerImplementations.java:49)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListenerImplementations$RunAfterActionListener.onResponse(ActionListenerImplementations.java:262)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:49)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.TransportService$UnregisterChildTransportResponseHandler.handleResponse(TransportService.java:1709)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1425)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:433)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.handleResponse(InboundHandler.java:382)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.executeResponseHandler(InboundHandler.java:147)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:122)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:96)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:825)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:124)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:96)
	at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:61)
	at org.elasticsearch.transport.netty4@8.13.0/org.elasticsearch.transport.netty4.Netty4MessageInboundHandler.channelRead(Netty4MessageInboundHandler.java:48)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.codec@4.1.94.Final/io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler@4.1.94.Final/io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383)
	at io.netty.handler@4.1.94.Final/io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246)
	at io.netty.handler@4.1.94.Final/io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295)
	at io.netty.codec@4.1.94.Final/io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.codec@4.1.94.Final/io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.codec@4.1.94.Final/io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.transport@4.1.94.Final/io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.transport@4.1.94.Final/io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.transport@4.1.94.Final/io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
	at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
	at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.common@4.1.94.Final/io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.common@4.1.94.Final/io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Thread.java:1583)

The consequence of the GetDatafeedRunningStateAction failing is that the whole GetDatafeedStatsAction fails.

Analysis of the logs shows that the cause of the duplicate was as follows:

  1. Datafeed was force-stopped - persistent task is deleted from cluster state
  2. Local persistent task executor on ML node notices and cancels the corresponding local task
  3. Local cancellation takes a long time (10+ minutes) due to repeated circuit breaker exceptions on search and indexing nodes leading to repeated retries - usually cancelling the local task would be much quicker, which is why this issue hasn't been seen before
  4. Datafeed is restarted before the first local task is cancelled - this is possible because lack of a persistent task in cluster state is taken to mean that the datafeed isn't running
  5. A second local task for the same datafeed is started, hence there are now two running on the ML node, one cancelled, one starting up
  6. Get datafeed stats is called during this overlap period, calls get datafeed running state as a sub-action, and that gets results from both tasks, leading to the duplicate key error

We can avoid this problem by sorting the responses by creation time and only keeping the state for the most recent task if there is more than one.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions