BUG-8403: move successor allocation to AbstractProxyTransaction
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 88e86bc08985f00310444854b20a20e129bd1d50..4764f24d495991f0082526fef2b569a62b828835 100644 (file)
@@ -11,6 +11,8 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -21,6 +23,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
@@ -211,33 +214,39 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+        void replayRequests(final Collection<ConnectionEntry> previousEntries) {
             // First look for our Create message
-            for (ConnectionEntry e : previousEntries) {
+            Iterator<ConnectionEntry> it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
+                        it.remove();
                         break;
                     }
                 }
             }
 
             for (AbstractProxyTransaction t : proxies.values()) {
-                LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
-                    t.isSnapshotOnly());
-                LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.replayMessages(newProxy, previousEntries);
+                LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+                t.replayMessages(successor, previousEntries);
             }
 
             // Now look for any finalizing messages
-            for (ConnectionEntry e : previousEntries) {
+            it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e  = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
-                    successor.connection.sendRequest(req, e.getCallback());
+                    if (req instanceof DestroyLocalHistoryRequest) {
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
+                        it.remove();
+                        break;
+                    }
                 }
             }
         }
@@ -257,18 +266,20 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
+            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
+            //        period required to get into the queue.
             if (request instanceof TransactionRequest) {
-                replayTransactionRequest((TransactionRequest<?>) request, callback);
+                forwardTransactionRequest((TransactionRequest<?>) request, callback);
             } else if (request instanceof LocalHistoryRequest) {
-                replayTo.accept(request, callback);
+                forwardTo.accept(request, callback);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void replayTransactionRequest(final TransactionRequest<?> request,
+        private void forwardTransactionRequest(final TransactionRequest<?> request,
                 final Consumer<Response<?, ?>> callback) throws RequestException {
 
             final AbstractProxyTransaction proxy;
@@ -282,7 +293,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 throw new RequestReplayException("Failed to find proxy for %s", request);
             }
 
-            proxy.replayRequest(request, callback);
+            proxy.forwardRequest(request, callback);
         }
     }
 
@@ -325,6 +336,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return identifier;
     }
 
+    final ClientActorContext context() {
+        return connection.context();
+    }
+
+    final long currentTime() {
+        return connection.currentTime();
+    }
+
     final ActorRef localActor() {
         return connection.localActor();
     }
@@ -333,7 +352,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return parent;
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+    AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
         lock.lock();
         try {
@@ -354,8 +373,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
-            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            // Removal will be completed once purge completes
+            LOG.debug("Proxy {} aborted transaction {}", this, tx);
             onTransactionAborted(tx);
         } finally {
             lock.unlock();
@@ -389,6 +408,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        connection.enqueueRequest(request, callback, enqueuedTicks);
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }