BUG-8491: Remove requests as they are replayed
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 827c19e526fcbd5f6b3cc1c33dfdc40698af11e9..802d9ed0b33bc3a1fb5716a62d84bf03ffd266d3 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -213,12 +214,15 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
             // First look for our Create message
-            for (ConnectionEntry e : previousEntries) {
+            Iterator<ConnectionEntry> it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
                         successor.connection.sendRequest(req, e.getCallback());
+                        it.remove();
                         break;
                     }
                 }
@@ -233,11 +237,17 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             // Now look for any finalizing messages
-            for (ConnectionEntry e : previousEntries) {
+            it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e  = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
-                    successor.connection.sendRequest(req, e.getCallback());
+                    if (req instanceof DestroyLocalHistoryRequest) {
+                        successor.connection.sendRequest(req, e.getCallback());
+                        it.remove();
+                        break;
+                    }
                 }
             }
         }
@@ -259,6 +269,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
+            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
+            //        period required to get into the queue.
             if (request instanceof TransactionRequest) {
                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
             } else if (request instanceof LocalHistoryRequest) {
@@ -325,6 +337,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return identifier;
     }
 
+    final long currentTime() {
+        return connection.currentTime();
+    }
+
     final ActorRef localActor() {
         return connection.localActor();
     }
@@ -389,6 +405,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        connection.enqueueRequest(request, callback, enqueuedTicks);
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }