BUG-5280: expose queue messages during reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 07fcbebad01a263d38a0bfabe839e8b02832ffe2..b3b604b7f08ad7e42abb4a224626673b97b8b8e3 100644 (file)
@@ -22,6 +22,8 @@ import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -195,12 +197,33 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replaySuccessfulRequests() {
+        void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+            // First look for our Create message
+            for (ConnectionEntry e : previousEntries) {
+                final Request<?, ?> req = e.getRequest();
+                if (identifier.equals(req.getTarget())) {
+                    Verify.verify(req instanceof LocalHistoryRequest);
+                    if (req instanceof CreateLocalHistoryRequest) {
+                        successor.connection.sendRequest(req, e.getCallback());
+                        break;
+                    }
+                }
+            }
+
             for (AbstractProxyTransaction t : proxies.values()) {
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.startReconnect(newProxy);
+                t.replayMessages(newProxy, previousEntries);
+            }
+
+            // Now look for any finalizing messages
+            for (ConnectionEntry e : previousEntries) {
+                final Request<?, ?> req = e.getRequest();
+                if (identifier.equals(req.getTarget())) {
+                    Verify.verify(req instanceof LocalHistoryRequest);
+                    successor.connection.sendRequest(req, e.getCallback());
+                }
             }
         }
 
@@ -347,6 +370,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         successor = createSuccessor(newConnection);
         LOG.debug("History {} instantiated successor {}", this, successor);
+
+        for (AbstractProxyTransaction t : proxies.values()) {
+            t.startReconnect();
+        }
+
         return new ReconnectCohort();
     }