Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / BouncingReconnectForwarder.java
index 3136023204a6b0a09703ff08cfbb483d5821559f..52bf1d930fd71b9cca62a3a4669c4c7760e155eb 100644 (file)
@@ -7,7 +7,8 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Maps;
 import java.util.Collection;
@@ -46,7 +47,7 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
     private BouncingReconnectForwarder(final ConnectedClientConnection<?> successor,
             final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts) {
         super(successor);
-        this.cohorts = Preconditions.checkNotNull(cohorts);
+        this.cohorts = requireNonNull(cohorts);
     }
 
     static ReconnectForwarder forCohorts(final ConnectedClientConnection<?> successor,
@@ -55,9 +56,25 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
     }
 
+    @Override
+    protected void forwardEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
 
     @Override
-    protected void forwardEntry(final ConnectionEntry entry) {
+    protected void replayEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
+
+    private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
         final Request<? , ?> request = entry.getRequest();
 
         final LocalHistoryIdentifier historyId;
@@ -69,18 +86,12 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
 
-        try {
-            final ProxyReconnectCohort cohort = cohorts.get(historyId);
-            if (cohort == null) {
-                LOG.warn("Cohort for request {} not found, aborting it", request);
-                throw new CohortNotFoundException(historyId);
-            }
-
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
-            cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
-        } catch (RequestException e) {
-            entry.complete(request.toRequestFailure(e));
+        final ProxyReconnectCohort cohort = cohorts.get(historyId);
+        if (cohort == null) {
+            LOG.warn("Cohort for request {} not found, aborting it", request);
+            throw new CohortNotFoundException(historyId);
         }
+
+        return cohort;
     }
-}
\ No newline at end of file
+}