BUG-5280: expose queue messages during reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.Request;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Per-connection representation of a local history. This class handles state replication across a single connection.
43  *
44  * @author Robert Varga
45  */
46 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
47     private abstract static class AbstractLocal extends ProxyHistory {
48         private final DataTree dataTree;
49
50         AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
51             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
52             super(connection, identifier);
53             this.dataTree = Preconditions.checkNotNull(dataTree);
54         }
55
56         final DataTreeSnapshot takeSnapshot() {
57             return dataTree.takeSnapshot();
58         }
59     }
60
61     private abstract static class AbstractRemote extends ProxyHistory {
62         AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
63             final LocalHistoryIdentifier identifier) {
64             super(connection, identifier);
65         }
66
67         @Override
68         final AbstractProxyTransaction doCreateTransactionProxy(
69                 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
70             return new RemoteProxyTransaction(this, txId);
71         }
72     }
73
74     private static final class Local extends AbstractLocal {
75         private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
76                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
77
78         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
79         // the open one and attempts to create a new transaction again.
80         private LocalProxyTransaction lastOpen;
81
82         private volatile LocalProxyTransaction lastSealed;
83
84         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
85             final DataTree dataTree) {
86             super(connection, identifier, dataTree);
87         }
88
89         @Override
90         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
91                 final TransactionIdentifier txId) {
92             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
93
94             // onTransactionCompleted() runs concurrently
95             final LocalProxyTransaction localSealed = lastSealed;
96             final DataTreeSnapshot baseSnapshot;
97             if (localSealed != null) {
98                 baseSnapshot = localSealed.getSnapshot();
99             } else {
100                 baseSnapshot = takeSnapshot();
101             }
102
103             lastOpen = new LocalProxyTransaction(this, txId,
104                 (CursorAwareDataTreeModification) baseSnapshot.newModification());
105             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
106             return lastOpen;
107         }
108
109         @Override
110         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
111             return createClient(connection, getIdentifier());
112         }
113
114         @Override
115         void onTransactionAborted(final AbstractProxyTransaction tx) {
116             Preconditions.checkState(tx.equals(lastOpen));
117             lastOpen = null;
118         }
119
120         @Override
121         void onTransactionCompleted(final AbstractProxyTransaction tx) {
122             Verify.verify(tx instanceof LocalProxyTransaction);
123
124             if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
125                 LOG.debug("Completed last sealed transaction {}", tx);
126             }
127         }
128
129         @Override
130         void onTransactionSealed(final AbstractProxyTransaction tx) {
131             Preconditions.checkState(tx.equals(lastOpen));
132             lastSealed = lastOpen;
133             lastOpen = null;
134         }
135     }
136
137     private static final class LocalSingle extends AbstractLocal {
138         LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
139             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
140             super(connection, identifier, dataTree);
141         }
142
143         @Override
144         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
145                 final TransactionIdentifier txId) {
146             return new LocalProxyTransaction(this, txId,
147                 (CursorAwareDataTreeModification) takeSnapshot().newModification());
148         }
149
150         @Override
151         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
152             return createSingle(connection, getIdentifier());
153         }
154     }
155
156     private static final class Remote extends AbstractRemote {
157         Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
158             super(connection, identifier);
159         }
160
161         @Override
162         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
163             return createClient(connection, getIdentifier());
164         }
165     }
166
167     private static final class RemoteSingle extends AbstractRemote {
168         RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
169             final LocalHistoryIdentifier identifier) {
170             super(connection, identifier);
171         }
172
173         @Override
174         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
175             return createSingle(connection, getIdentifier());
176         }
177     }
178
179     private static final class RequestReplayException extends RequestException {
180         private static final long serialVersionUID = 1L;
181
182         RequestReplayException(final String format, final Object... args) {
183             super(String.format(format, args));
184         }
185
186         @Override
187         public boolean isRetriable() {
188             return false;
189         }
190     }
191
192     private final class ReconnectCohort extends ProxyReconnectCohort {
193         @Override
194         public LocalHistoryIdentifier getIdentifier() {
195             return identifier;
196         }
197
198         @GuardedBy("lock")
199         @Override
200         void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
201             // First look for our Create message
202             for (ConnectionEntry e : previousEntries) {
203                 final Request<?, ?> req = e.getRequest();
204                 if (identifier.equals(req.getTarget())) {
205                     Verify.verify(req instanceof LocalHistoryRequest);
206                     if (req instanceof CreateLocalHistoryRequest) {
207                         successor.connection.sendRequest(req, e.getCallback());
208                         break;
209                     }
210                 }
211             }
212
213             for (AbstractProxyTransaction t : proxies.values()) {
214                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
215                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
216                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
217                 t.replayMessages(newProxy, previousEntries);
218             }
219
220             // Now look for any finalizing messages
221             for (ConnectionEntry e : previousEntries) {
222                 final Request<?, ?> req = e.getRequest();
223                 if (identifier.equals(req.getTarget())) {
224                     Verify.verify(req instanceof LocalHistoryRequest);
225                     successor.connection.sendRequest(req, e.getCallback());
226                 }
227             }
228         }
229
230         @GuardedBy("lock")
231         @Override
232         ProxyHistory finishReconnect() {
233             final ProxyHistory ret = Verify.verifyNotNull(successor);
234
235             for (AbstractProxyTransaction t : proxies.values()) {
236                 t.finishReconnect();
237             }
238
239             LOG.debug("Finished reconnecting proxy history {}", this);
240             lock.unlock();
241             return ret;
242         }
243
244         @Override
245         void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
246                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
247             if (request instanceof TransactionRequest) {
248                 replayTransactionRequest((TransactionRequest<?>) request, callback);
249             } else if (request instanceof LocalHistoryRequest) {
250                 replayTo.accept(request, callback);
251             } else {
252                 throw new IllegalArgumentException("Unhandled request " + request);
253             }
254         }
255
256         private void replayTransactionRequest(final TransactionRequest<?> request,
257                 final Consumer<Response<?, ?>> callback) throws RequestException {
258
259             final AbstractProxyTransaction proxy;
260             lock.lock();
261             try {
262                 proxy = proxies.get(request.getTarget());
263             } finally {
264                 lock.unlock();
265             }
266             if (proxy == null) {
267                 throw new RequestReplayException("Failed to find proxy for %s", request);
268             }
269
270             proxy.replayRequest(request, callback);
271         }
272     }
273
274     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
275
276     private final Lock lock = new ReentrantLock();
277     private final LocalHistoryIdentifier identifier;
278     private final AbstractClientConnection<ShardBackendInfo> connection;
279
280     @GuardedBy("lock")
281     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
282     @GuardedBy("lock")
283     private ProxyHistory successor;
284
285     private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
286             final LocalHistoryIdentifier identifier) {
287         this.connection = Preconditions.checkNotNull(connection);
288         this.identifier = Preconditions.checkNotNull(identifier);
289     }
290
291     static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
292             final LocalHistoryIdentifier identifier) {
293         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
294         return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
295              : new Remote(connection, identifier);
296     }
297
298     static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
299             final LocalHistoryIdentifier identifier) {
300         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
301         return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
302              : new RemoteSingle(connection, identifier);
303     }
304
305     @Override
306     public LocalHistoryIdentifier getIdentifier() {
307         return identifier;
308     }
309
310     final ActorRef localActor() {
311         return connection.localActor();
312     }
313
314     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
315         lock.lock();
316         try {
317             if (successor != null) {
318                 return successor.createTransactionProxy(txId);
319             }
320
321             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
322             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
323             proxies.put(proxyId, ret);
324             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
325             return ret;
326         } finally {
327             lock.unlock();
328         }
329     }
330
331     final void abortTransaction(final AbstractProxyTransaction tx) {
332         lock.lock();
333         try {
334             proxies.remove(tx.getIdentifier());
335             LOG.debug("Proxy {} aborting transaction {}", this, tx);
336             onTransactionAborted(tx);
337         } finally {
338             lock.unlock();
339         }
340     }
341
342     final void completeTransaction(final AbstractProxyTransaction tx) {
343         lock.lock();
344         try {
345             proxies.remove(tx.getIdentifier());
346             LOG.debug("Proxy {} completing transaction {}", this, tx);
347             onTransactionCompleted(tx);
348         } finally {
349             lock.unlock();
350         }
351     }
352
353     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
354         connection.sendRequest(request, callback);
355     }
356
357     @GuardedBy("lock")
358     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
359             TransactionIdentifier txId);
360
361     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
362
363     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
364     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
365         lock.lock();
366         if (successor != null) {
367             lock.unlock();
368             throw new IllegalStateException("Proxy history " + this + " already has a successor");
369         }
370
371         successor = createSuccessor(newConnection);
372         LOG.debug("History {} instantiated successor {}", this, successor);
373
374         for (AbstractProxyTransaction t : proxies.values()) {
375             t.startReconnect();
376         }
377
378         return new ReconnectCohort();
379     }
380
381     @GuardedBy("lock")
382     void onTransactionAborted(final AbstractProxyTransaction tx) {
383         // No-op for most implementations
384     }
385
386     @GuardedBy("lock")
387     void onTransactionCompleted(final AbstractProxyTransaction tx) {
388         // No-op for most implementations
389     }
390
391     void onTransactionSealed(final AbstractProxyTransaction tx) {
392         // No-op on most implementations
393     }
394 }