BUG-5280: expose queue messages during reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicLong;
19 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
20 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
21 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
22 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
23 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
30  * frontend.
31  *
32  * <p>
33  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
34  * Java world outside of actor containment are captured in {@link DataStoreClient}.
35  *
36  * <p>
37  * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
38  *            When touching internal state, be mindful of the execution context from which execution context, Actor
39  *            or POJO, is the state being accessed or modified.
40  *
41  * <p>
42  * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
43  *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
44  *                doubt, feel free to synchronize on this object.
45  *
46  * <p>
47  * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
48  *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
49  *              for correctness and performance impact.
50  *
51  * <p>
52  * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
53  *             for performing work and charging applications for it. That has two positive effects:
54  *             - CPU usage is distributed across applications, minimizing work done in the actor thread
55  *             - CPU usage provides back-pressure towards the application.
56  *
57  * @author Robert Varga
58  */
59 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
60         implements DataStoreClient {
61     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
62
63     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
64     private final AtomicLong nextHistoryId = new AtomicLong(1);
65     private final SingleClientHistory singleHistory;
66
67     private volatile Throwable aborted;
68
69     AbstractDataStoreClientBehavior(final ClientActorContext context,
70             final BackendInfoResolver<ShardBackendInfo> resolver) {
71         super(context, resolver);
72         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
73     }
74
75     //
76     //
77     // Methods below are invoked from the client actor thread
78     //
79     //
80
81     @Override
82     protected final void haltClient(final Throwable cause) {
83         // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
84         // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
85         // thread.
86         if (aborted != null) {
87             abortOperations(cause);
88         }
89     }
90
91     private void abortOperations(final Throwable cause) {
92         // This acts as a barrier, application threads check this after they have added an entry in the maps,
93         // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
94         aborted = cause;
95
96         for (ClientLocalHistory h : histories.values()) {
97             h.localAbort(cause);
98         }
99         histories.clear();
100     }
101
102     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
103         abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
104         return null;
105     }
106
107     @Override
108     protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
109         if (command instanceof GetClientRequest) {
110             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
111         } else {
112             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
113         }
114
115         return this;
116     }
117
118     /*
119      * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
120      * involved, as the messages need to be replayed to the individual proxies.
121      */
122     @Override
123     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
124         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
125         //         further TransactionProxies can be created and we can safely traverse maps without risking
126         //         missing an entry
127         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
128         startReconnect(singleHistory, newConn, cohorts);
129         for (ClientLocalHistory h : histories.values()) {
130             startReconnect(h, newConn, cohorts);
131         }
132
133         return previousEntries -> {
134             try {
135                 // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
136                 //         the non-throttling interface to the connection, hence we use a wrapper consumer
137                 for (HistoryReconnectCohort c : cohorts) {
138                     c.replaySuccessfulRequests(previousEntries);
139                 }
140
141                 // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
142                 //         requests will be immediately sent to it and requests being sent concurrently will get
143                 //         forwarded once they hit the new connection.
144                 return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
145             } finally {
146                 // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
147                 for (HistoryReconnectCohort c : cohorts) {
148                     c.close();
149                 }
150             }
151         };
152     }
153
154     private static void startReconnect(final AbstractClientHistory history,
155             final ConnectedClientConnection<ShardBackendInfo> newConn,
156             final Collection<HistoryReconnectCohort> cohorts) {
157         final HistoryReconnectCohort cohort = history.startReconnect(newConn);
158         if (cohort != null) {
159             cohorts.add(cohort);
160         }
161     }
162
163     //
164     //
165     // Methods below are invoked from application threads
166     //
167     //
168
169     @Override
170     public final ClientLocalHistory createLocalHistory() {
171         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
172             nextHistoryId.getAndIncrement());
173         final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
174         LOG.debug("{}: creating a new local history {}", persistenceId(), history);
175
176         Verify.verify(histories.put(historyId, history) == null);
177
178         final Throwable a = aborted;
179         if (a != null) {
180             history.localAbort(a);
181             histories.remove(historyId, history);
182             throw Throwables.propagate(a);
183         }
184
185         return history;
186     }
187
188     @Override
189     public final ClientTransaction createTransaction() {
190         return singleHistory.createTransaction();
191     }
192
193     @Override
194     public final void close() {
195         context().executeInActor(this::shutdown);
196     }
197
198     abstract Long resolveShardForPath(final YangInstanceIdentifier path);
199 }