BUG-8403: do not throttle purge requests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 8c3b485134c6f801f35f475c48cd33b1f5da3a6d..88a79775baa94730eba45a01ed8e229c0b701c69 100644 (file)
@@ -11,6 +11,8 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -21,6 +23,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
@@ -211,14 +214,17 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+        void replayRequests(final Collection<ConnectionEntry> previousEntries) {
             // First look for our Create message
-            for (ConnectionEntry e : previousEntries) {
+            Iterator<ConnectionEntry> it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
+                        it.remove();
                         break;
                     }
                 }
@@ -233,11 +239,17 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             // Now look for any finalizing messages
-            for (ConnectionEntry e : previousEntries) {
+            it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e  = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
-                    successor.connection.sendRequest(req, e.getCallback());
+                    if (req instanceof DestroyLocalHistoryRequest) {
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
+                        it.remove();
+                        break;
+                    }
                 }
             }
         }
@@ -327,6 +339,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return identifier;
     }
 
+    final ClientActorContext context() {
+        return connection.context();
+    }
+
     final long currentTime() {
         return connection.currentTime();
     }
@@ -360,8 +376,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
-            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            // Removal will be completed once purge completes
+            LOG.debug("Proxy {} aborted transaction {}", this, tx);
             onTransactionAborted(tx);
         } finally {
             lock.unlock();