BUG-8372: fix AbstractProxyTransaction.replayMessages()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.Request;
33 import org.opendaylight.controller.cluster.access.concepts.RequestException;
34 import org.opendaylight.controller.cluster.access.concepts.Response;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Per-connection representation of a local history. This class handles state replication across a single connection.
44  *
45  * @author Robert Varga
46  */
47 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
48     private abstract static class AbstractLocal extends ProxyHistory {
49         private final DataTree dataTree;
50
51         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
52             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
53             super(parent, connection, identifier);
54             this.dataTree = Preconditions.checkNotNull(dataTree);
55         }
56
57         final DataTreeSnapshot takeSnapshot() {
58             return dataTree.takeSnapshot();
59         }
60     }
61
62     private abstract static class AbstractRemote extends ProxyHistory {
63         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
64             final LocalHistoryIdentifier identifier) {
65             super(parent, connection, identifier);
66         }
67     }
68
69     private static final class Local extends AbstractLocal {
70         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
71                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
72
73         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
74         // the open one and attempts to create a new transaction again.
75         private LocalReadWriteProxyTransaction lastOpen;
76
77         private volatile LocalReadWriteProxyTransaction lastSealed;
78
79         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
80             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
81             super(parent, connection, identifier, dataTree);
82         }
83
84         @Override
85         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
86                 final TransactionIdentifier txId, final boolean snapshotOnly) {
87             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
88
89             // onTransactionCompleted() runs concurrently
90             final LocalReadWriteProxyTransaction localSealed = lastSealed;
91             final DataTreeSnapshot baseSnapshot;
92             if (localSealed != null) {
93                 baseSnapshot = localSealed.getSnapshot();
94             } else {
95                 baseSnapshot = takeSnapshot();
96             }
97
98             if (snapshotOnly) {
99                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
100             }
101
102             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
103             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
104             return lastOpen;
105         }
106
107         @Override
108         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109             return createClient(parent(), connection, getIdentifier());
110         }
111
112         @Override
113         void onTransactionAborted(final AbstractProxyTransaction tx) {
114             if (tx.equals(lastOpen)) {
115                 lastOpen = null;
116             }
117         }
118
119         @Override
120         void onTransactionCompleted(final AbstractProxyTransaction tx) {
121             Verify.verify(tx instanceof LocalProxyTransaction);
122             if (tx instanceof LocalReadWriteProxyTransaction) {
123                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
124                     LOG.debug("Completed last sealed transaction {}", tx);
125                 }
126             }
127         }
128
129         @Override
130         void onTransactionSealed(final AbstractProxyTransaction tx) {
131             Preconditions.checkState(tx.equals(lastOpen));
132             lastSealed = lastOpen;
133             lastOpen = null;
134         }
135     }
136
137     private static final class LocalSingle extends AbstractLocal {
138         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
139             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
140             super(parent, connection, identifier, dataTree);
141         }
142
143         @Override
144         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
145                 final TransactionIdentifier txId, final boolean snapshotOnly) {
146             final DataTreeSnapshot snapshot = takeSnapshot();
147             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
148                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
149         }
150
151         @Override
152         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
153             return createSingle(parent(), connection, getIdentifier());
154         }
155     }
156
157     private static final class Remote extends AbstractRemote {
158         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
159             final LocalHistoryIdentifier identifier) {
160             super(parent, connection, identifier);
161         }
162
163         @Override
164         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
165                 final TransactionIdentifier txId, final boolean snapshotOnly) {
166             return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
167         }
168
169         @Override
170         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
171             return createClient(parent(), connection, getIdentifier());
172         }
173     }
174
175     private static final class RemoteSingle extends AbstractRemote {
176         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
177             final LocalHistoryIdentifier identifier) {
178             super(parent, connection, identifier);
179         }
180
181         @Override
182         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
183                 final TransactionIdentifier txId, final boolean snapshotOnly) {
184             return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
185         }
186
187         @Override
188         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
189             return createSingle(parent(), connection, getIdentifier());
190         }
191     }
192
193     private static final class RequestReplayException extends RequestException {
194         private static final long serialVersionUID = 1L;
195
196         RequestReplayException(final String format, final Object... args) {
197             super(String.format(format, args));
198         }
199
200         @Override
201         public boolean isRetriable() {
202             return false;
203         }
204     }
205
206     private final class ReconnectCohort extends ProxyReconnectCohort {
207         @Override
208         public LocalHistoryIdentifier getIdentifier() {
209             return identifier;
210         }
211
212         @GuardedBy("lock")
213         @Override
214         void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
215             // First look for our Create message
216             for (ConnectionEntry e : previousEntries) {
217                 final Request<?, ?> req = e.getRequest();
218                 if (identifier.equals(req.getTarget())) {
219                     Verify.verify(req instanceof LocalHistoryRequest);
220                     if (req instanceof CreateLocalHistoryRequest) {
221                         successor.connection.sendRequest(req, e.getCallback());
222                         break;
223                     }
224                 }
225             }
226
227             for (AbstractProxyTransaction t : proxies.values()) {
228                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
229                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
230                     t.isSnapshotOnly());
231                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
232                 t.replayMessages(newProxy, previousEntries);
233             }
234
235             // Now look for any finalizing messages
236             for (ConnectionEntry e : previousEntries) {
237                 final Request<?, ?> req = e.getRequest();
238                 if (identifier.equals(req.getTarget())) {
239                     Verify.verify(req instanceof LocalHistoryRequest);
240                     successor.connection.sendRequest(req, e.getCallback());
241                 }
242             }
243         }
244
245         @GuardedBy("lock")
246         @Override
247         ProxyHistory finishReconnect() {
248             final ProxyHistory ret = Verify.verifyNotNull(successor);
249
250             for (AbstractProxyTransaction t : proxies.values()) {
251                 t.finishReconnect();
252             }
253
254             LOG.debug("Finished reconnecting proxy history {}", this);
255             lock.unlock();
256             return ret;
257         }
258
259         @Override
260         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
261                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
262             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
263             //        period required to get into the queue.
264             if (request instanceof TransactionRequest) {
265                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
266             } else if (request instanceof LocalHistoryRequest) {
267                 forwardTo.accept(request, callback);
268             } else {
269                 throw new IllegalArgumentException("Unhandled request " + request);
270             }
271         }
272
273         private void forwardTransactionRequest(final TransactionRequest<?> request,
274                 final Consumer<Response<?, ?>> callback) throws RequestException {
275
276             final AbstractProxyTransaction proxy;
277             lock.lock();
278             try {
279                 proxy = proxies.get(request.getTarget());
280             } finally {
281                 lock.unlock();
282             }
283             if (proxy == null) {
284                 throw new RequestReplayException("Failed to find proxy for %s", request);
285             }
286
287             proxy.forwardRequest(request, callback);
288         }
289     }
290
291     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
292
293     private final Lock lock = new ReentrantLock();
294     private final LocalHistoryIdentifier identifier;
295     private final AbstractClientConnection<ShardBackendInfo> connection;
296     private final AbstractClientHistory parent;
297
298     @GuardedBy("lock")
299     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
300     @GuardedBy("lock")
301     private ProxyHistory successor;
302
303     private ProxyHistory(final AbstractClientHistory parent,
304             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
305         this.parent = Preconditions.checkNotNull(parent);
306         this.connection = Preconditions.checkNotNull(connection);
307         this.identifier = Preconditions.checkNotNull(identifier);
308     }
309
310     static ProxyHistory createClient(final AbstractClientHistory parent,
311             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
312         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
313         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
314              : new Remote(parent, connection, identifier);
315     }
316
317     static ProxyHistory createSingle(final AbstractClientHistory parent,
318             final AbstractClientConnection<ShardBackendInfo> connection,
319             final LocalHistoryIdentifier identifier) {
320         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
321         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
322              : new RemoteSingle(parent, connection, identifier);
323     }
324
325     @Override
326     public LocalHistoryIdentifier getIdentifier() {
327         return identifier;
328     }
329
330     final ActorRef localActor() {
331         return connection.localActor();
332     }
333
334     final AbstractClientHistory parent() {
335         return parent;
336     }
337
338     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
339             final boolean snapshotOnly) {
340         lock.lock();
341         try {
342             if (successor != null) {
343                 return successor.createTransactionProxy(txId, snapshotOnly);
344             }
345
346             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
347             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
348             proxies.put(proxyId, ret);
349             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
350             return ret;
351         } finally {
352             lock.unlock();
353         }
354     }
355
356     final void abortTransaction(final AbstractProxyTransaction tx) {
357         lock.lock();
358         try {
359             proxies.remove(tx.getIdentifier());
360             LOG.debug("Proxy {} aborting transaction {}", this, tx);
361             onTransactionAborted(tx);
362         } finally {
363             lock.unlock();
364         }
365     }
366
367     final void completeTransaction(final AbstractProxyTransaction tx) {
368         lock.lock();
369         try {
370             proxies.remove(tx.getIdentifier());
371             LOG.debug("Proxy {} completing transaction {}", this, tx);
372             onTransactionCompleted(tx);
373         } finally {
374             lock.unlock();
375         }
376     }
377
378     final void close() {
379         lock.lock();
380         try {
381             if (successor != null) {
382                 successor.close();
383                 return;
384             }
385
386             LOG.debug("Proxy {} invoking destroy", this);
387             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
388                 this::onDestroyComplete);
389         } finally {
390             lock.unlock();
391         }
392     }
393
394     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
395         connection.sendRequest(request, callback);
396     }
397
398     @GuardedBy("lock")
399     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
400             TransactionIdentifier txId, boolean snapshotOnly);
401
402     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
403
404     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
405     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
406         lock.lock();
407         if (successor != null) {
408             lock.unlock();
409             throw new IllegalStateException("Proxy history " + this + " already has a successor");
410         }
411
412         successor = createSuccessor(newConnection);
413         LOG.debug("History {} instantiated successor {}", this, successor);
414
415         for (AbstractProxyTransaction t : proxies.values()) {
416             t.startReconnect();
417         }
418
419         return new ReconnectCohort();
420     }
421
422     private void onDestroyComplete(final Response<?, ?> response) {
423         LOG.debug("Proxy {} destroy completed with {}", this, response);
424
425         lock.lock();
426         try {
427             parent.onProxyDestroyed(this);
428             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
429                 this::onPurgeComplete);
430         } finally {
431             lock.unlock();
432         }
433     }
434
435     private void onPurgeComplete(final Response<?, ?> response) {
436         LOG.debug("Proxy {} purge completed with {}", this, response);
437     }
438
439     @GuardedBy("lock")
440     void onTransactionAborted(final AbstractProxyTransaction tx) {
441         // No-op for most implementations
442     }
443
444     @GuardedBy("lock")
445     void onTransactionCompleted(final AbstractProxyTransaction tx) {
446         // No-op for most implementations
447     }
448
449     void onTransactionSealed(final AbstractProxyTransaction tx) {
450         // No-op on most implementations
451     }
452 }