Correct annotations
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import com.google.common.base.Verify;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.LinkedHashMap;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
29 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
30 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
31 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
32 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
34 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
35 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
36 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
37 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
38 import org.opendaylight.controller.cluster.access.concepts.Request;
39 import org.opendaylight.controller.cluster.access.concepts.RequestException;
40 import org.opendaylight.controller.cluster.access.concepts.Response;
41 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
42 import org.opendaylight.yangtools.concepts.Identifiable;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Per-connection representation of a local history. This class handles state replication across a single connection.
50  *
51  * @author Robert Varga
52  */
53 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
54     private abstract static class AbstractLocal extends ProxyHistory {
55         private final ReadOnlyDataTree dataTree;
56
57         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
58             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
59             super(parent, connection, identifier);
60             this.dataTree = requireNonNull(dataTree);
61         }
62
63         final DataTreeSnapshot takeSnapshot() {
64             return dataTree.takeSnapshot();
65         }
66     }
67
68     private abstract static class AbstractRemote extends ProxyHistory {
69         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
70             final LocalHistoryIdentifier identifier) {
71             super(parent, connection, identifier);
72         }
73     }
74
75     private static final class Local extends AbstractLocal {
76         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
77                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
78
79         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
80         // the open one and attempts to create a new transaction again.
81         private LocalReadWriteProxyTransaction lastOpen;
82
83         private volatile LocalReadWriteProxyTransaction lastSealed;
84
85         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
86             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
87             super(parent, connection, identifier, dataTree);
88         }
89
90         @Override
91         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
92                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
93             checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
94
95             if (isDone) {
96                 // Done transactions do not register on our radar on should not have any state associated.
97                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
98                         : new LocalReadWriteProxyTransaction(this, txId);
99             }
100
101             // onTransactionCompleted() runs concurrently
102             final LocalReadWriteProxyTransaction localSealed = lastSealed;
103             final DataTreeSnapshot baseSnapshot;
104             if (localSealed != null) {
105                 baseSnapshot = localSealed.getSnapshot();
106             } else {
107                 baseSnapshot = takeSnapshot();
108             }
109
110             if (snapshotOnly) {
111                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
112             }
113
114             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
115             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
116             return lastOpen;
117         }
118
119         @Override
120         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
121             return createClient(parent(), connection, getIdentifier());
122         }
123
124         @Override
125         void onTransactionAborted(final AbstractProxyTransaction tx) {
126             if (tx.equals(lastOpen)) {
127                 lastOpen = null;
128             }
129         }
130
131         @Override
132         void onTransactionCompleted(final AbstractProxyTransaction tx) {
133             Verify.verify(tx instanceof LocalProxyTransaction);
134             if (tx instanceof LocalReadWriteProxyTransaction
135                     && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
136                 LOG.debug("Completed last sealed transaction {}", tx);
137             }
138         }
139
140         @Override
141         void onTransactionSealed(final AbstractProxyTransaction tx) {
142             checkState(tx.equals(lastOpen));
143             lastSealed = lastOpen;
144             lastOpen = null;
145         }
146     }
147
148     private static final class LocalSingle extends AbstractLocal {
149         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
150             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
151             super(parent, connection, identifier, dataTree);
152         }
153
154         @Override
155         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
156                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
157             final DataTreeSnapshot snapshot = takeSnapshot();
158             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
159                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
160         }
161
162         @Override
163         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
164             return createSingle(parent(), connection, getIdentifier());
165         }
166     }
167
168     private static final class Remote extends AbstractRemote {
169         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
170             final LocalHistoryIdentifier identifier) {
171             super(parent, connection, identifier);
172         }
173
174         @Override
175         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
176                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
177             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
178         }
179
180         @Override
181         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
182             return createClient(parent(), connection, getIdentifier());
183         }
184     }
185
186     private static final class RemoteSingle extends AbstractRemote {
187         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
188             final LocalHistoryIdentifier identifier) {
189             super(parent, connection, identifier);
190         }
191
192         @Override
193         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
194                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
195             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
196         }
197
198         @Override
199         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
200             return createSingle(parent(), connection, getIdentifier());
201         }
202     }
203
204     private static final class RequestReplayException extends RequestException {
205         private static final long serialVersionUID = 1L;
206
207         RequestReplayException(final String format, final Object... args) {
208             super(String.format(format, args));
209         }
210
211         @Override
212         public boolean isRetriable() {
213             return false;
214         }
215     }
216
217     private final class ReconnectCohort extends ProxyReconnectCohort {
218         @Override
219         public LocalHistoryIdentifier getIdentifier() {
220             return identifier;
221         }
222
223         @Holding("lock")
224         @Override
225         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
226             // First look for our Create message
227             Iterator<ConnectionEntry> it = previousEntries.iterator();
228             while (it.hasNext()) {
229                 final ConnectionEntry e = it.next();
230                 final Request<?, ?> req = e.getRequest();
231                 if (identifier.equals(req.getTarget())) {
232                     Verify.verify(req instanceof LocalHistoryRequest);
233                     if (req instanceof CreateLocalHistoryRequest) {
234                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
235                         it.remove();
236                         break;
237                     }
238                 }
239             }
240
241             for (AbstractProxyTransaction t : proxies.values()) {
242                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
243                 t.replayMessages(successor, previousEntries);
244             }
245
246             // Now look for any finalizing messages
247             it = previousEntries.iterator();
248             while (it.hasNext()) {
249                 final ConnectionEntry e  = it.next();
250                 final Request<?, ?> req = e.getRequest();
251                 if (identifier.equals(req.getTarget())) {
252                     Verify.verify(req instanceof LocalHistoryRequest);
253                     if (req instanceof DestroyLocalHistoryRequest) {
254                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
255                         it.remove();
256                         break;
257                     }
258                 }
259             }
260         }
261
262         @GuardedBy("lock")
263         @Override
264         ProxyHistory finishReconnect() {
265             final ProxyHistory ret = Verify.verifyNotNull(successor);
266
267             for (AbstractProxyTransaction t : proxies.values()) {
268                 t.finishReconnect();
269             }
270
271             LOG.debug("Finished reconnecting proxy history {}", this);
272             lock.unlock();
273             return ret;
274         }
275
276         @Override
277         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
278                 throws RequestException {
279             final Request<?, ?> request = entry.getRequest();
280             if (request instanceof TransactionRequest) {
281                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
282                     entry.getEnqueuedTicks());
283             } else if (request instanceof LocalHistoryRequest) {
284                 replayTo.accept(entry);
285             } else {
286                 throw new IllegalArgumentException("Unhandled request " + request);
287             }
288         }
289
290         @Override
291         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
292                 throws RequestException {
293             final Request<?, ?> request = entry.getRequest();
294             if (request instanceof TransactionRequest) {
295                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
296             } else if (request instanceof LocalHistoryRequest) {
297                 forwardTo.accept(entry);
298             } else {
299                 throw new IllegalArgumentException("Unhandled request " + request);
300             }
301         }
302
303         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
304                 throws RequestReplayException {
305             final AbstractProxyTransaction proxy;
306             lock.lock();
307             try {
308                 proxy = proxies.get(request.getTarget());
309             } finally {
310                 lock.unlock();
311             }
312             if (proxy != null) {
313                 return proxy;
314             }
315
316             throw new RequestReplayException("Failed to find proxy for %s", request);
317         }
318     }
319
320     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
321
322     private final Lock lock = new ReentrantLock();
323     private final @NonNull LocalHistoryIdentifier identifier;
324     private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
325     private final @NonNull AbstractClientHistory parent;
326
327     @GuardedBy("lock")
328     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
329     @GuardedBy("lock")
330     private ProxyHistory successor;
331
332     private ProxyHistory(final AbstractClientHistory parent,
333             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
334         this.parent = requireNonNull(parent);
335         this.connection = requireNonNull(connection);
336         this.identifier = requireNonNull(identifier);
337     }
338
339     static ProxyHistory createClient(final AbstractClientHistory parent,
340             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
341         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
342         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
343              : new Remote(parent, connection, identifier);
344     }
345
346     static ProxyHistory createSingle(final AbstractClientHistory parent,
347             final AbstractClientConnection<ShardBackendInfo> connection,
348             final LocalHistoryIdentifier identifier) {
349         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
350         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
351              : new RemoteSingle(parent, connection, identifier);
352     }
353
354     @Override
355     public LocalHistoryIdentifier getIdentifier() {
356         return identifier;
357     }
358
359     final ClientActorContext context() {
360         return connection.context();
361     }
362
363     final long currentTime() {
364         return connection.currentTime();
365     }
366
367     final ActorRef localActor() {
368         return connection.localActor();
369     }
370
371     final AbstractClientHistory parent() {
372         return parent;
373     }
374
375     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
376             final boolean snapshotOnly) {
377         return createTransactionProxy(txId, snapshotOnly, false);
378     }
379
380     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
381             final boolean isDone) {
382         lock.lock();
383         try {
384             if (successor != null) {
385                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
386             }
387
388             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
389             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
390             proxies.put(proxyId, ret);
391             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
392             return ret;
393         } finally {
394             lock.unlock();
395         }
396     }
397
398     final void abortTransaction(final AbstractProxyTransaction tx) {
399         lock.lock();
400         try {
401             // Removal will be completed once purge completes
402             LOG.debug("Proxy {} aborted transaction {}", this, tx);
403             onTransactionAborted(tx);
404         } finally {
405             lock.unlock();
406         }
407     }
408
409     final void completeTransaction(final AbstractProxyTransaction tx) {
410         lock.lock();
411         try {
412             // Removal will be completed once purge completes
413             LOG.debug("Proxy {} completing transaction {}", this, tx);
414             onTransactionCompleted(tx);
415         } finally {
416             lock.unlock();
417         }
418     }
419
420     void purgeTransaction(final AbstractProxyTransaction tx) {
421         lock.lock();
422         try {
423             proxies.remove(tx.getIdentifier());
424             LOG.debug("Proxy {} purged transaction {}", this, tx);
425         } finally {
426             lock.unlock();
427         }
428     }
429
430     final void close() {
431         lock.lock();
432         try {
433             if (successor != null) {
434                 successor.close();
435                 return;
436             }
437
438             LOG.debug("Proxy {} invoking destroy", this);
439             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
440                 this::onDestroyComplete);
441         } finally {
442             lock.unlock();
443         }
444     }
445
446     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
447             final long enqueuedTicks) {
448         connection.enqueueRequest(request, callback, enqueuedTicks);
449     }
450
451     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
452         connection.sendRequest(request, callback);
453     }
454
455     @Holding("lock")
456     @SuppressWarnings("checkstyle:hiddenField")
457     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
458             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
459
460     @Holding("lock")
461     @SuppressWarnings("checkstyle:hiddenField")
462     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
463
464     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
465     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
466         lock.lock();
467         if (successor != null) {
468             lock.unlock();
469             throw new IllegalStateException("Proxy history " + this + " already has a successor");
470         }
471
472         successor = createSuccessor(newConnection);
473         LOG.debug("History {} instantiated successor {}", this, successor);
474
475         for (AbstractProxyTransaction t : proxies.values()) {
476             t.startReconnect();
477         }
478
479         return new ReconnectCohort();
480     }
481
482     private void onDestroyComplete(final Response<?, ?> response) {
483         LOG.debug("Proxy {} destroy completed with {}", this, response);
484
485         lock.lock();
486         try {
487             parent.onProxyDestroyed(this);
488             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
489                 this::onPurgeComplete);
490         } finally {
491             lock.unlock();
492         }
493     }
494
495     private void onPurgeComplete(final Response<?, ?> response) {
496         LOG.debug("Proxy {} purge completed with {}", this, response);
497     }
498
499     @Holding("lock")
500     void onTransactionAborted(final AbstractProxyTransaction tx) {
501         // No-op for most implementations
502     }
503
504     @Holding("lock")
505     void onTransactionCompleted(final AbstractProxyTransaction tx) {
506         // No-op for most implementations
507     }
508
509     void onTransactionSealed(final AbstractProxyTransaction tx) {
510         // No-op on most implementations
511     }
512 }