Do not assert seal transition on forward path
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index b529d94c2b226bb879b9c0f3cf4baa07a7414359..ad105c31f2f194ff610bac75ff8c3d432324b853 100644 (file)
@@ -19,7 +19,6 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
@@ -86,9 +85,15 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
+            if (isDone) {
+                // Done transactions do not register on our radar on should not have any state associated.
+                return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+                        : new LocalReadWriteProxyTransaction(this, txId);
+            }
+
             // onTransactionCompleted() runs concurrently
             final LocalReadWriteProxyTransaction localSealed = lastSealed;
             final DataTreeSnapshot baseSnapshot;
@@ -122,10 +127,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void onTransactionCompleted(final AbstractProxyTransaction tx) {
             Verify.verify(tx instanceof LocalProxyTransaction);
-            if (tx instanceof LocalReadWriteProxyTransaction) {
-                if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
-                    LOG.debug("Completed last sealed transaction {}", tx);
-                }
+            if (tx instanceof LocalReadWriteProxyTransaction
+                    && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+                LOG.debug("Completed last sealed transaction {}", tx);
             }
         }
 
@@ -145,7 +149,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
             final DataTreeSnapshot snapshot = takeSnapshot();
             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
@@ -165,8 +169,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
         }
 
         @Override
@@ -183,8 +187,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
         }
 
         @Override
@@ -223,7 +227,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
@@ -231,11 +235,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             for (AbstractProxyTransaction t : proxies.values()) {
-                LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
-                    t.isSnapshotOnly());
-                LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.replayMessages(newProxy, previousEntries);
+                LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+                t.replayMessages(successor, previousEntries);
             }
 
             // Now look for any finalizing messages
@@ -246,7 +247,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof DestroyLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
@@ -269,22 +270,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
+        void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
             if (request instanceof TransactionRequest) {
-                forwardTransactionRequest((TransactionRequest<?>) request, callback);
+                lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+                    entry.getEnqueuedTicks());
             } else if (request instanceof LocalHistoryRequest) {
-                forwardTo.accept(request, callback);
+                replayTo.accept(entry);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void forwardTransactionRequest(final TransactionRequest<?> request,
-                final Consumer<Response<?, ?>> callback) throws RequestException {
+        @Override
+        void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
+            if (request instanceof TransactionRequest) {
+                lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+            } else if (request instanceof LocalHistoryRequest) {
+                forwardTo.accept(entry);
+            } else {
+                throw new IllegalArgumentException("Unhandled request " + request);
+            }
+        }
 
+        private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+                throws RequestReplayException {
             final AbstractProxyTransaction proxy;
             lock.lock();
             try {
@@ -292,11 +305,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             } finally {
                 lock.unlock();
             }
-            if (proxy == null) {
-                throw new RequestReplayException("Failed to find proxy for %s", request);
+            if (proxy != null) {
+                return proxy;
             }
 
-            proxy.forwardRequest(request, callback);
+            throw new RequestReplayException("Failed to find proxy for %s", request);
         }
     }
 
@@ -357,14 +370,19 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
+        return createTransactionProxy(txId, snapshotOnly, false);
+    }
+
+    AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+            final boolean isDone) {
         lock.lock();
         try {
             if (successor != null) {
-                return successor.createTransactionProxy(txId, snapshotOnly);
+                return successor.createTransactionProxy(txId, snapshotOnly, isDone);
             }
 
             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
             return ret;
@@ -376,8 +394,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
-            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            // Removal will be completed once purge completes
+            LOG.debug("Proxy {} aborted transaction {}", this, tx);
             onTransactionAborted(tx);
         } finally {
             lock.unlock();
@@ -387,7 +405,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void completeTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
+            // Removal will be completed once purge completes
             LOG.debug("Proxy {} completing transaction {}", this, tx);
             onTransactionCompleted(tx);
         } finally {
@@ -395,6 +413,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    void purgeTransaction(final AbstractProxyTransaction tx) {
+        lock.lock();
+        try {
+            proxies.remove(tx.getIdentifier());
+            LOG.debug("Proxy {} purged transaction {}", this, tx);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void close() {
         lock.lock();
         try {
@@ -421,9 +449,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     @GuardedBy("lock")
+    @SuppressWarnings("checkstyle:hiddenField")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
-            TransactionIdentifier txId, boolean snapshotOnly);
+            TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
 
+    @SuppressWarnings("checkstyle:hiddenField")
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
 
     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")