Minor sal-distributed-datastore cleanups
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import akka.actor.ActorRef;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.Collection;
18 import java.util.Iterator;
19 import java.util.LinkedHashMap;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import java.util.function.Consumer;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.checkerframework.checker.lock.qual.Holding;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
30 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
31 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
32 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
33 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
34 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
35 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
36 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
38 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
39 import org.opendaylight.controller.cluster.access.concepts.Request;
40 import org.opendaylight.controller.cluster.access.concepts.RequestException;
41 import org.opendaylight.controller.cluster.access.concepts.Response;
42 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
43 import org.opendaylight.yangtools.concepts.Identifiable;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Per-connection representation of a local history. This class handles state replication across a single connection.
51  *
52  * @author Robert Varga
53  */
54 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
55     private abstract static class AbstractLocal extends ProxyHistory {
56         private final ReadOnlyDataTree dataTree;
57
58         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
59             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
60             super(parent, connection, identifier);
61             this.dataTree = requireNonNull(dataTree);
62         }
63
64         final DataTreeSnapshot takeSnapshot() {
65             return dataTree.takeSnapshot();
66         }
67     }
68
69     private abstract static class AbstractRemote extends ProxyHistory {
70         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
71             final LocalHistoryIdentifier identifier) {
72             super(parent, connection, identifier);
73         }
74     }
75
76     private static final class Local extends AbstractLocal {
77         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
78                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
79
80         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
81         // the open one and attempts to create a new transaction again.
82         private LocalReadWriteProxyTransaction lastOpen;
83
84         private volatile LocalReadWriteProxyTransaction lastSealed;
85
86         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
87             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
88             super(parent, connection, identifier, dataTree);
89         }
90
91         @Override
92         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
93                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
94             checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
95
96             if (isDone) {
97                 // Done transactions do not register on our radar on should not have any state associated.
98                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
99                         : new LocalReadWriteProxyTransaction(this, txId);
100             }
101
102             // onTransactionCompleted() runs concurrently
103             final LocalReadWriteProxyTransaction localSealed = lastSealed;
104             final DataTreeSnapshot baseSnapshot;
105             if (localSealed != null) {
106                 baseSnapshot = localSealed.getSnapshot();
107             } else {
108                 baseSnapshot = takeSnapshot();
109             }
110
111             if (snapshotOnly) {
112                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
113             }
114
115             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
116             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
117             return lastOpen;
118         }
119
120         @Override
121         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
122             return createClient(parent(), connection, getIdentifier());
123         }
124
125         @Override
126         void onTransactionAborted(final AbstractProxyTransaction tx) {
127             if (tx.equals(lastOpen)) {
128                 lastOpen = null;
129             }
130         }
131
132         @Override
133         void onTransactionCompleted(final AbstractProxyTransaction tx) {
134             verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx);
135             if (tx instanceof LocalReadWriteProxyTransaction
136                     && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
137                 LOG.debug("Completed last sealed transaction {}", tx);
138             }
139         }
140
141         @Override
142         void onTransactionSealed(final AbstractProxyTransaction tx) {
143             checkState(tx.equals(lastOpen));
144             lastSealed = lastOpen;
145             lastOpen = null;
146         }
147     }
148
149     private static final class LocalSingle extends AbstractLocal {
150         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
151             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
152             super(parent, connection, identifier, dataTree);
153         }
154
155         @Override
156         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
157                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
158             final DataTreeSnapshot snapshot = takeSnapshot();
159             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
160                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
161         }
162
163         @Override
164         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
165             return createSingle(parent(), connection, getIdentifier());
166         }
167     }
168
169     private static final class Remote extends AbstractRemote {
170         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
171             final LocalHistoryIdentifier identifier) {
172             super(parent, connection, identifier);
173         }
174
175         @Override
176         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
177                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
178             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
179         }
180
181         @Override
182         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
183             return createClient(parent(), connection, getIdentifier());
184         }
185     }
186
187     private static final class RemoteSingle extends AbstractRemote {
188         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
189             final LocalHistoryIdentifier identifier) {
190             super(parent, connection, identifier);
191         }
192
193         @Override
194         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
195                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
196             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
197         }
198
199         @Override
200         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
201             return createSingle(parent(), connection, getIdentifier());
202         }
203     }
204
205     private static final class RequestReplayException extends RequestException {
206         private static final long serialVersionUID = 1L;
207
208         RequestReplayException(final String format, final Object... args) {
209             super(String.format(format, args));
210         }
211
212         @Override
213         public boolean isRetriable() {
214             return false;
215         }
216     }
217
218     private final class ReconnectCohort extends ProxyReconnectCohort {
219         @Override
220         public LocalHistoryIdentifier getIdentifier() {
221             return identifier;
222         }
223
224         @Holding("lock")
225         @Override
226         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
227             // First look for our Create message
228             Iterator<ConnectionEntry> it = previousEntries.iterator();
229             while (it.hasNext()) {
230                 final ConnectionEntry e = it.next();
231                 final Request<?, ?> req = e.getRequest();
232                 if (identifier.equals(req.getTarget())) {
233                     verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
234                     if (req instanceof CreateLocalHistoryRequest) {
235                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
236                         it.remove();
237                         break;
238                     }
239                 }
240             }
241
242             for (AbstractProxyTransaction t : proxies.values()) {
243                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
244                 t.replayMessages(successor, previousEntries);
245             }
246
247             // Now look for any finalizing messages
248             it = previousEntries.iterator();
249             while (it.hasNext()) {
250                 final ConnectionEntry e  = it.next();
251                 final Request<?, ?> req = e.getRequest();
252                 if (identifier.equals(req.getTarget())) {
253                     verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
254                     if (req instanceof DestroyLocalHistoryRequest) {
255                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
256                         it.remove();
257                         break;
258                     }
259                 }
260             }
261         }
262
263         @Holding("lock")
264         @Override
265         ProxyHistory finishReconnect() {
266             final ProxyHistory ret = verifyNotNull(successor);
267
268             for (AbstractProxyTransaction t : proxies.values()) {
269                 t.finishReconnect();
270             }
271
272             LOG.debug("Finished reconnecting proxy history {}", this);
273             lock.unlock();
274             return ret;
275         }
276
277         @Override
278         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
279                 throws RequestException {
280             final Request<?, ?> request = entry.getRequest();
281             if (request instanceof TransactionRequest) {
282                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
283                     entry.getEnqueuedTicks());
284             } else if (request instanceof LocalHistoryRequest) {
285                 replayTo.accept(entry);
286             } else {
287                 throw new IllegalArgumentException("Unhandled request " + request);
288             }
289         }
290
291         @Override
292         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
293                 throws RequestException {
294             final Request<?, ?> request = entry.getRequest();
295             if (request instanceof TransactionRequest) {
296                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
297             } else if (request instanceof LocalHistoryRequest) {
298                 forwardTo.accept(entry);
299             } else {
300                 throw new IllegalArgumentException("Unhandled request " + request);
301             }
302         }
303
304         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
305                 throws RequestReplayException {
306             final AbstractProxyTransaction proxy;
307             lock.lock();
308             try {
309                 proxy = proxies.get(request.getTarget());
310             } finally {
311                 lock.unlock();
312             }
313             if (proxy != null) {
314                 return proxy;
315             }
316
317             throw new RequestReplayException("Failed to find proxy for %s", request);
318         }
319     }
320
321     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
322
323     private final Lock lock = new ReentrantLock();
324     private final @NonNull LocalHistoryIdentifier identifier;
325     private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
326     private final @NonNull AbstractClientHistory parent;
327
328     @GuardedBy("lock")
329     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
330     @GuardedBy("lock")
331     private ProxyHistory successor;
332
333     private ProxyHistory(final AbstractClientHistory parent,
334             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
335         this.parent = requireNonNull(parent);
336         this.connection = requireNonNull(connection);
337         this.identifier = requireNonNull(identifier);
338     }
339
340     static ProxyHistory createClient(final AbstractClientHistory parent,
341             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
342         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
343         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
344              : new Remote(parent, connection, identifier);
345     }
346
347     static ProxyHistory createSingle(final AbstractClientHistory parent,
348             final AbstractClientConnection<ShardBackendInfo> connection,
349             final LocalHistoryIdentifier identifier) {
350         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
351         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
352              : new RemoteSingle(parent, connection, identifier);
353     }
354
355     @Override
356     // Non-final for mocking
357     public LocalHistoryIdentifier getIdentifier() {
358         return identifier;
359     }
360
361     final ClientActorContext context() {
362         return connection.context();
363     }
364
365     final long currentTime() {
366         return connection.currentTime();
367     }
368
369     final ActorRef localActor() {
370         return connection.localActor();
371     }
372
373     final AbstractClientHistory parent() {
374         return parent;
375     }
376
377     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
378             final boolean snapshotOnly) {
379         return createTransactionProxy(txId, snapshotOnly, false);
380     }
381
382     // Non-final for mocking
383     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
384             final boolean isDone) {
385         lock.lock();
386         try {
387             if (successor != null) {
388                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
389             }
390
391             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
392             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
393             proxies.put(proxyId, ret);
394             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
395             return ret;
396         } finally {
397             lock.unlock();
398         }
399     }
400
401     final void abortTransaction(final AbstractProxyTransaction tx) {
402         lock.lock();
403         try {
404             // Removal will be completed once purge completes
405             LOG.debug("Proxy {} aborted transaction {}", this, tx);
406             onTransactionAborted(tx);
407         } finally {
408             lock.unlock();
409         }
410     }
411
412     final void completeTransaction(final AbstractProxyTransaction tx) {
413         lock.lock();
414         try {
415             // Removal will be completed once purge completes
416             LOG.debug("Proxy {} completing transaction {}", this, tx);
417             onTransactionCompleted(tx);
418         } finally {
419             lock.unlock();
420         }
421     }
422
423     final void purgeTransaction(final AbstractProxyTransaction tx) {
424         lock.lock();
425         try {
426             proxies.remove(tx.getIdentifier());
427             LOG.debug("Proxy {} purged transaction {}", this, tx);
428         } finally {
429             lock.unlock();
430         }
431     }
432
433     final void close() {
434         lock.lock();
435         try {
436             if (successor != null) {
437                 successor.close();
438                 return;
439             }
440
441             LOG.debug("Proxy {} invoking destroy", this);
442             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
443                 this::onDestroyComplete);
444         } finally {
445             lock.unlock();
446         }
447     }
448
449     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
450             final long enqueuedTicks) {
451         connection.enqueueRequest(request, callback, enqueuedTicks);
452     }
453
454     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
455         connection.sendRequest(request, callback);
456     }
457
458     @Holding("lock")
459     @SuppressWarnings("checkstyle:hiddenField")
460     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
461             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
462
463     @Holding("lock")
464     @SuppressWarnings("checkstyle:hiddenField")
465     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
466
467     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
468     final ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
469         lock.lock();
470         if (successor != null) {
471             lock.unlock();
472             throw new IllegalStateException("Proxy history " + this + " already has a successor");
473         }
474
475         successor = createSuccessor(newConnection);
476         LOG.debug("History {} instantiated successor {}", this, successor);
477
478         for (AbstractProxyTransaction t : proxies.values()) {
479             t.startReconnect();
480         }
481
482         return new ReconnectCohort();
483     }
484
485     private void onDestroyComplete(final Response<?, ?> response) {
486         LOG.debug("Proxy {} destroy completed with {}", this, response);
487
488         lock.lock();
489         try {
490             parent.onProxyDestroyed(this);
491             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
492                 this::onPurgeComplete);
493         } finally {
494             lock.unlock();
495         }
496     }
497
498     private void onPurgeComplete(final Response<?, ?> response) {
499         LOG.debug("Proxy {} purge completed with {}", this, response);
500     }
501
502     @Holding("lock")
503     void onTransactionAborted(final AbstractProxyTransaction tx) {
504         // No-op for most implementations
505     }
506
507     @Holding("lock")
508     void onTransactionCompleted(final AbstractProxyTransaction tx) {
509         // No-op for most implementations
510     }
511
512     @Holding("lock")
513     void onTransactionSealed(final AbstractProxyTransaction tx) {
514         // No-op on most implementations
515     }
516 }