8939ec977e16dabcc6f9507950aa07a3d95c3d4c
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.annotations.Beta;
11 import java.util.Optional;
12 import java.util.concurrent.CompletionStage;
13 import javax.annotation.Nonnull;
14 import javax.annotation.Nullable;
15 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
16 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
17 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
18 import org.opendaylight.controller.cluster.access.concepts.RequestException;
19 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
20 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
21 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
22 import org.opendaylight.yangtools.concepts.Identifiable;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.FiniteDuration;
26
27 /**
28  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
29  *
30  * @author Robert Varga
31  */
32 @Beta
33 public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<ClientActorContext>
34         implements Identifiable<ClientIdentifier> {
35     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
36
37     protected ClientActorBehavior(final @Nonnull ClientActorContext context) {
38         super(context);
39     }
40
41     @Override
42     public final @Nonnull ClientIdentifier getIdentifier() {
43         return context().getIdentifier();
44     }
45
46     @Override
47     final ClientActorBehavior onReceiveCommand(final Object command) {
48         if (command instanceof InternalCommand) {
49             return ((InternalCommand) command).execute(this);
50         }
51         if (command instanceof SuccessEnvelope) {
52             return onRequestSuccess((SuccessEnvelope) command);
53         }
54         if (command instanceof FailureEnvelope) {
55             return onRequestFailure((FailureEnvelope) command);
56         }
57
58         return onCommand(command);
59     }
60
61     private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) {
62         return context().completeRequest(this, command);
63     }
64
65     private ClientActorBehavior onRequestFailure(final FailureEnvelope command) {
66         final RequestFailure<?, ?> failure = command.getMessage();
67         final RequestException cause = failure.getCause();
68         if (cause instanceof RetiredGenerationException) {
69             LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
70             haltClient(cause);
71             context().poison(cause);
72             return null;
73         }
74
75         if (failure.isHardFailure()) {
76             return context().completeRequest(this, command);
77         }
78
79         // TODO: add instanceof checks on cause to detect more problems
80
81         LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command);
82         return context().completeRequest(this, command);
83     }
84
85     // This method is executing in the actor context, hence we can safely interact with the queue
86     private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
87         // Get or allocate queue for the request
88         final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
89
90         // Note this is a tri-state return and can be null
91         final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
92         if (result == null) {
93             // Happy path: we are done here
94             return this;
95         }
96
97         if (result.isPresent()) {
98             // Less happy path: we need to schedule a timer
99             scheduleQueueTimeout(queue, result.get());
100             return this;
101         }
102
103         startResolve(queue, request.getTarget().getHistoryId().getCookie());
104         return this;
105     }
106
107     // This method is executing in the actor context, hence we can safely interact with the queue
108     private void startResolve(final SequencedQueue queue, final long cookie) {
109         // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a
110         // previous request to resolve.
111         final CompletionStage<? extends BackendInfo> f = resolver().getBackendInfo(cookie);
112
113         // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has
114         // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to
115         // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and
116         // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will
117         // bulk-process all requests.
118         if (queue.expectProof(f)) {
119             f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend)));
120         }
121     }
122
123     // This method is executing in the actor context, hence we can safely interact with the queue
124     private ClientActorBehavior finishResolve(final SequencedQueue queue,
125             final CompletionStage<? extends BackendInfo> futureBackend, final BackendInfo backend) {
126
127         final Optional<FiniteDuration> maybeTimeout = queue.setBackendInfo(futureBackend, backend);
128         if (maybeTimeout.isPresent()) {
129             scheduleQueueTimeout(queue, maybeTimeout.get());
130         }
131         return this;
132     }
133
134     // This method is executing in the actor context, hence we can safely interact with the queue
135     private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) {
136         LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout);
137         context().executeInActor(cb -> cb.queueTimeout(queue), timeout);
138     }
139
140     // This method is executing in the actor context, hence we can safely interact with the queue
141     private ClientActorBehavior queueTimeout(final SequencedQueue queue) {
142         final boolean needBackend;
143
144         try {
145             needBackend = queue.runTimeout();
146         } catch (NoProgressException e) {
147             // Uh-oh, no progress. The queue has already killed itself, now we need to remove it
148             context().removeQueue(queue);
149             return this;
150         }
151
152         if (needBackend) {
153             startResolve(queue, queue.getCookie());
154         }
155
156         return this;
157     }
158
159     /**
160      * Halt And Catch Fire.
161      *
162      * Halt processing on this client. Implementations need to ensure they initiate state flush procedures. No attempt
163      * to use this instance should be made after this method returns. Any such use may result in undefined behavior.
164      *
165      * @param cause Failure cause
166      */
167     protected abstract void haltClient(@Nonnull Throwable cause);
168
169     /**
170      * Override this method to handle any command which is not handled by the base behavior.
171      *
172      * @param command
173      * @return Next behavior to use, null if this actor should shut down.
174      */
175     protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command);
176
177     /**
178      * Override this method to provide a backend resolver instance.
179      *
180      * @return
181      */
182     protected abstract @Nonnull BackendInfoResolver<?> resolver();
183
184     /**
185      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
186      * from any thread.
187      *
188      * @param request Request to send
189      * @param callback Callback to invoke
190      */
191     public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
192         context().executeInActor(cb -> cb.doSendRequest(request, callback));
193     }
194 }