Bump odlparent to 5.0.0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.Consumer;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.checkerframework.checker.lock.qual.Holding;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Per-connection representation of a local history. This class handles state replication across a single connection.
47  *
48  * @author Robert Varga
49  */
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51     private abstract static class AbstractLocal extends ProxyHistory {
52         private final DataTree dataTree;
53
54         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56             super(parent, connection, identifier);
57             this.dataTree = Preconditions.checkNotNull(dataTree);
58         }
59
60         final DataTreeSnapshot takeSnapshot() {
61             return dataTree.takeSnapshot();
62         }
63     }
64
65     private abstract static class AbstractRemote extends ProxyHistory {
66         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67             final LocalHistoryIdentifier identifier) {
68             super(parent, connection, identifier);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalReadWriteProxyTransaction lastOpen;
79
80         private volatile LocalReadWriteProxyTransaction lastSealed;
81
82         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84             super(parent, connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
90             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91
92             if (isDone) {
93                 // Done transactions do not register on our radar on should not have any state associated.
94                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
95                         : new LocalReadWriteProxyTransaction(this, txId);
96             }
97
98             // onTransactionCompleted() runs concurrently
99             final LocalReadWriteProxyTransaction localSealed = lastSealed;
100             final DataTreeSnapshot baseSnapshot;
101             if (localSealed != null) {
102                 baseSnapshot = localSealed.getSnapshot();
103             } else {
104                 baseSnapshot = takeSnapshot();
105             }
106
107             if (snapshotOnly) {
108                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
109             }
110
111             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
112             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
113             return lastOpen;
114         }
115
116         @Override
117         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
118             return createClient(parent(), connection, getIdentifier());
119         }
120
121         @Override
122         void onTransactionAborted(final AbstractProxyTransaction tx) {
123             if (tx.equals(lastOpen)) {
124                 lastOpen = null;
125             }
126         }
127
128         @Override
129         void onTransactionCompleted(final AbstractProxyTransaction tx) {
130             Verify.verify(tx instanceof LocalProxyTransaction);
131             if (tx instanceof LocalReadWriteProxyTransaction
132                     && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
133                 LOG.debug("Completed last sealed transaction {}", tx);
134             }
135         }
136
137         @Override
138         void onTransactionSealed(final AbstractProxyTransaction tx) {
139             Preconditions.checkState(tx.equals(lastOpen));
140             lastSealed = lastOpen;
141             lastOpen = null;
142         }
143     }
144
145     private static final class LocalSingle extends AbstractLocal {
146         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
147             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
148             super(parent, connection, identifier, dataTree);
149         }
150
151         @Override
152         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
153                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
154             final DataTreeSnapshot snapshot = takeSnapshot();
155             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
156                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
157         }
158
159         @Override
160         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
161             return createSingle(parent(), connection, getIdentifier());
162         }
163     }
164
165     private static final class Remote extends AbstractRemote {
166         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
167             final LocalHistoryIdentifier identifier) {
168             super(parent, connection, identifier);
169         }
170
171         @Override
172         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
173                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
174             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
175         }
176
177         @Override
178         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
179             return createClient(parent(), connection, getIdentifier());
180         }
181     }
182
183     private static final class RemoteSingle extends AbstractRemote {
184         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
185             final LocalHistoryIdentifier identifier) {
186             super(parent, connection, identifier);
187         }
188
189         @Override
190         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
191                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
192             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
193         }
194
195         @Override
196         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
197             return createSingle(parent(), connection, getIdentifier());
198         }
199     }
200
201     private static final class RequestReplayException extends RequestException {
202         private static final long serialVersionUID = 1L;
203
204         RequestReplayException(final String format, final Object... args) {
205             super(String.format(format, args));
206         }
207
208         @Override
209         public boolean isRetriable() {
210             return false;
211         }
212     }
213
214     private final class ReconnectCohort extends ProxyReconnectCohort {
215         @Override
216         public LocalHistoryIdentifier getIdentifier() {
217             return identifier;
218         }
219
220         @Holding("lock")
221         @Override
222         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
223             // First look for our Create message
224             Iterator<ConnectionEntry> it = previousEntries.iterator();
225             while (it.hasNext()) {
226                 final ConnectionEntry e = it.next();
227                 final Request<?, ?> req = e.getRequest();
228                 if (identifier.equals(req.getTarget())) {
229                     Verify.verify(req instanceof LocalHistoryRequest);
230                     if (req instanceof CreateLocalHistoryRequest) {
231                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
232                         it.remove();
233                         break;
234                     }
235                 }
236             }
237
238             for (AbstractProxyTransaction t : proxies.values()) {
239                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
240                 t.replayMessages(successor, previousEntries);
241             }
242
243             // Now look for any finalizing messages
244             it = previousEntries.iterator();
245             while (it.hasNext()) {
246                 final ConnectionEntry e  = it.next();
247                 final Request<?, ?> req = e.getRequest();
248                 if (identifier.equals(req.getTarget())) {
249                     Verify.verify(req instanceof LocalHistoryRequest);
250                     if (req instanceof DestroyLocalHistoryRequest) {
251                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
252                         it.remove();
253                         break;
254                     }
255                 }
256             }
257         }
258
259         @GuardedBy("lock")
260         @Override
261         ProxyHistory finishReconnect() {
262             final ProxyHistory ret = Verify.verifyNotNull(successor);
263
264             for (AbstractProxyTransaction t : proxies.values()) {
265                 t.finishReconnect();
266             }
267
268             LOG.debug("Finished reconnecting proxy history {}", this);
269             lock.unlock();
270             return ret;
271         }
272
273         @Override
274         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
275                 throws RequestException {
276             final Request<?, ?> request = entry.getRequest();
277             if (request instanceof TransactionRequest) {
278                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
279                     entry.getEnqueuedTicks());
280             } else if (request instanceof LocalHistoryRequest) {
281                 replayTo.accept(entry);
282             } else {
283                 throw new IllegalArgumentException("Unhandled request " + request);
284             }
285         }
286
287         @Override
288         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
289                 throws RequestException {
290             final Request<?, ?> request = entry.getRequest();
291             if (request instanceof TransactionRequest) {
292                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
293             } else if (request instanceof LocalHistoryRequest) {
294                 forwardTo.accept(entry);
295             } else {
296                 throw new IllegalArgumentException("Unhandled request " + request);
297             }
298         }
299
300         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
301                 throws RequestReplayException {
302             final AbstractProxyTransaction proxy;
303             lock.lock();
304             try {
305                 proxy = proxies.get(request.getTarget());
306             } finally {
307                 lock.unlock();
308             }
309             if (proxy != null) {
310                 return proxy;
311             }
312
313             throw new RequestReplayException("Failed to find proxy for %s", request);
314         }
315     }
316
317     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
318
319     private final Lock lock = new ReentrantLock();
320     private final LocalHistoryIdentifier identifier;
321     private final AbstractClientConnection<ShardBackendInfo> connection;
322     private final AbstractClientHistory parent;
323
324     @GuardedBy("lock")
325     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
326     @GuardedBy("lock")
327     private ProxyHistory successor;
328
329     private ProxyHistory(final AbstractClientHistory parent,
330             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
331         this.parent = Preconditions.checkNotNull(parent);
332         this.connection = Preconditions.checkNotNull(connection);
333         this.identifier = Preconditions.checkNotNull(identifier);
334     }
335
336     static ProxyHistory createClient(final AbstractClientHistory parent,
337             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
338         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
339         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
340              : new Remote(parent, connection, identifier);
341     }
342
343     static ProxyHistory createSingle(final AbstractClientHistory parent,
344             final AbstractClientConnection<ShardBackendInfo> connection,
345             final LocalHistoryIdentifier identifier) {
346         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
347         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
348              : new RemoteSingle(parent, connection, identifier);
349     }
350
351     @Override
352     public LocalHistoryIdentifier getIdentifier() {
353         return identifier;
354     }
355
356     final ClientActorContext context() {
357         return connection.context();
358     }
359
360     final long currentTime() {
361         return connection.currentTime();
362     }
363
364     final ActorRef localActor() {
365         return connection.localActor();
366     }
367
368     final AbstractClientHistory parent() {
369         return parent;
370     }
371
372     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
373             final boolean snapshotOnly) {
374         return createTransactionProxy(txId, snapshotOnly, false);
375     }
376
377     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
378             final boolean isDone) {
379         lock.lock();
380         try {
381             if (successor != null) {
382                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
383             }
384
385             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
386             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
387             proxies.put(proxyId, ret);
388             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
389             return ret;
390         } finally {
391             lock.unlock();
392         }
393     }
394
395     final void abortTransaction(final AbstractProxyTransaction tx) {
396         lock.lock();
397         try {
398             // Removal will be completed once purge completes
399             LOG.debug("Proxy {} aborted transaction {}", this, tx);
400             onTransactionAborted(tx);
401         } finally {
402             lock.unlock();
403         }
404     }
405
406     final void completeTransaction(final AbstractProxyTransaction tx) {
407         lock.lock();
408         try {
409             // Removal will be completed once purge completes
410             LOG.debug("Proxy {} completing transaction {}", this, tx);
411             onTransactionCompleted(tx);
412         } finally {
413             lock.unlock();
414         }
415     }
416
417     void purgeTransaction(final AbstractProxyTransaction tx) {
418         lock.lock();
419         try {
420             proxies.remove(tx.getIdentifier());
421             LOG.debug("Proxy {} purged transaction {}", this, tx);
422         } finally {
423             lock.unlock();
424         }
425     }
426
427     final void close() {
428         lock.lock();
429         try {
430             if (successor != null) {
431                 successor.close();
432                 return;
433             }
434
435             LOG.debug("Proxy {} invoking destroy", this);
436             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
437                 this::onDestroyComplete);
438         } finally {
439             lock.unlock();
440         }
441     }
442
443     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
444             final long enqueuedTicks) {
445         connection.enqueueRequest(request, callback, enqueuedTicks);
446     }
447
448     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
449         connection.sendRequest(request, callback);
450     }
451
452     @GuardedBy("lock")
453     @SuppressWarnings("checkstyle:hiddenField")
454     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
455             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
456
457     @SuppressWarnings("checkstyle:hiddenField")
458     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
459
460     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
461     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
462         lock.lock();
463         if (successor != null) {
464             lock.unlock();
465             throw new IllegalStateException("Proxy history " + this + " already has a successor");
466         }
467
468         successor = createSuccessor(newConnection);
469         LOG.debug("History {} instantiated successor {}", this, successor);
470
471         for (AbstractProxyTransaction t : proxies.values()) {
472             t.startReconnect();
473         }
474
475         return new ReconnectCohort();
476     }
477
478     private void onDestroyComplete(final Response<?, ?> response) {
479         LOG.debug("Proxy {} destroy completed with {}", this, response);
480
481         lock.lock();
482         try {
483             parent.onProxyDestroyed(this);
484             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
485                 this::onPurgeComplete);
486         } finally {
487             lock.unlock();
488         }
489     }
490
491     private void onPurgeComplete(final Response<?, ?> response) {
492         LOG.debug("Proxy {} purge completed with {}", this, response);
493     }
494
495     @Holding("lock")
496     void onTransactionAborted(final AbstractProxyTransaction tx) {
497         // No-op for most implementations
498     }
499
500     @Holding("lock")
501     void onTransactionCompleted(final AbstractProxyTransaction tx) {
502         // No-op for most implementations
503     }
504
505     void onTransactionSealed(final AbstractProxyTransaction tx) {
506         // No-op on most implementations
507     }
508 }