2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.actors.client;
10 import com.google.common.annotations.Beta;
11 import java.util.Optional;
12 import java.util.concurrent.CompletionStage;
13 import javax.annotation.Nonnull;
14 import javax.annotation.Nullable;
15 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
16 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
17 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
18 import org.opendaylight.controller.cluster.access.concepts.RequestException;
19 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
20 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
21 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
22 import org.opendaylight.yangtools.concepts.Identifiable;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.FiniteDuration;
28 * A behavior, which handles messages sent to a {@link AbstractClientActor}.
30 * @param <T> Frontend type
32 * @author Robert Varga
35 public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<ClientActorContext>
36 implements Identifiable<ClientIdentifier> {
37 private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
39 protected ClientActorBehavior(final @Nonnull ClientActorContext context) {
44 public final @Nonnull ClientIdentifier getIdentifier() {
45 return context().getIdentifier();
49 final ClientActorBehavior onReceiveCommand(final Object command) {
50 if (command instanceof InternalCommand) {
51 return ((InternalCommand) command).execute(this);
53 if (command instanceof SuccessEnvelope) {
54 return onRequestSuccess((SuccessEnvelope) command);
56 if (command instanceof FailureEnvelope) {
57 return onRequestFailure((FailureEnvelope) command);
60 return onCommand(command);
63 private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) {
64 return context().completeRequest(this, command);
67 private ClientActorBehavior onRequestFailure(final FailureEnvelope command) {
68 final RequestFailure<?, ?> failure = command.getMessage();
69 final RequestException cause = failure.getCause();
70 if (cause instanceof RetiredGenerationException) {
71 LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
73 context().poison(cause);
77 if (failure.isHardFailure()) {
78 return context().completeRequest(this, command);
81 // TODO: add instanceof checks on cause to detect more problems
83 LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command);
84 return context().completeRequest(this, command);
87 // This method is executing in the actor context, hence we can safely interact with the queue
88 private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest<?> request,
89 final RequestCallback callback) {
90 // Get or allocate queue for the request
91 final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
93 // Note this is a tri-state return and can be null
94 final Optional<FiniteDuration> result = queue.enqueueRequest(sequence, request, callback);
96 // Happy path: we are done here
100 if (result.isPresent()) {
101 // Less happy path: we need to schedule a timer
102 scheduleQueueTimeout(queue, result.get());
106 startResolve(queue, request.getTarget().getHistoryId().getCookie());
110 // This method is executing in the actor context, hence we can safely interact with the queue
111 private void startResolve(final SequencedQueue queue, final long cookie) {
112 // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a
113 // previous request to resolve.
114 final CompletionStage<? extends BackendInfo> f = resolver().getBackendInfo(cookie);
116 // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has
117 // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to
118 // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and
119 // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will
120 // bulk-process all requests.
121 if (queue.expectProof(f)) {
122 f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend)));
126 // This method is executing in the actor context, hence we can safely interact with the queue
127 private ClientActorBehavior finishResolve(final SequencedQueue queue,
128 final CompletionStage<? extends BackendInfo> futureBackend, final BackendInfo backend) {
130 final Optional<FiniteDuration> maybeTimeout = queue.setBackendInfo(futureBackend, backend);
131 if (maybeTimeout.isPresent()) {
132 scheduleQueueTimeout(queue, maybeTimeout.get());
137 // This method is executing in the actor context, hence we can safely interact with the queue
138 private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) {
139 LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout);
140 context().executeInActor(cb -> cb.queueTimeout(queue), timeout);
143 // This method is executing in the actor context, hence we can safely interact with the queue
144 private ClientActorBehavior queueTimeout(final SequencedQueue queue) {
145 final boolean needBackend;
148 needBackend = queue.runTimeout();
149 } catch (NoProgressException e) {
150 // Uh-oh, no progress. The queue has already killed itself, now we need to remove it
151 context().removeQueue(queue);
156 startResolve(queue, queue.getCookie());
163 * Halt And Catch Fire.
165 * Halt processing on this client. Implementations need to ensure they initiate state flush procedures. No attempt
166 * to use this instance should be made after this method returns. Any such use may result in undefined behavior.
168 * @param cause Failure cause
170 protected abstract void haltClient(@Nonnull Throwable cause);
173 * Override this method to handle any command which is not handled by the base behavior.
176 * @return Next behavior to use, null if this actor should shut down.
178 protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command);
181 * Override this method to provide a backend resolver instance.
185 protected abstract @Nonnull BackendInfoResolver<?> resolver();
188 * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
191 * @param request Request to send
192 * @param callback Callback to invoke
194 public final void sendRequest(final long sequence, final TransactionRequest<?> request, final RequestCallback callback) {
195 context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback));