Do not assert seal transition on forward path
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / BouncingReconnectForwarder.java
index c6564b66fbf507a9f8d9c0d778284e18853c1527..26e346e77cb4e0a0a19b6c66b5f5903d141a4803 100644 (file)
@@ -26,16 +26,20 @@ import org.slf4j.LoggerFactory;
 // Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor
 // connection
 final class BouncingReconnectForwarder extends ReconnectForwarder {
-    private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
-
-    private static final RequestException FAILED_TO_REPLAY_EXCEPTION = new RequestException("Cohort not found") {
+    private static final class CohortNotFoundException extends RequestException {
         private static final long serialVersionUID = 1L;
 
+        CohortNotFoundException(final LocalHistoryIdentifier historyId) {
+            super("Cohort for " + historyId + " not found");
+        }
+
         @Override
         public boolean isRetriable() {
             return false;
         }
-    };
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
 
     private final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts;
 
@@ -51,9 +55,25 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
     }
 
+    @Override
+    protected void forwardEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
 
     @Override
-    protected void forwardEntry(final ConnectionEntry entry) {
+    protected void replayEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
+
+    private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
         final Request<? , ?> request = entry.getRequest();
 
         final LocalHistoryIdentifier historyId;
@@ -65,18 +85,12 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
 
-        try {
-            final ProxyReconnectCohort cohort = cohorts.get(historyId);
-            if (cohort == null) {
-                LOG.warn("Cohort for request {} not found, aborting it", request);
-                throw FAILED_TO_REPLAY_EXCEPTION;
-            }
-
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
-            cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
-        } catch (RequestException e) {
-            entry.complete(request.toRequestFailure(e));
+        final ProxyReconnectCohort cohort = cohorts.get(historyId);
+        if (cohort == null) {
+            LOG.warn("Cohort for request {} not found, aborting it", request);
+            throw new CohortNotFoundException(historyId);
         }
+
+        return cohort;
     }
-}
\ No newline at end of file
+}