BUG-8452: make NoShardLeaderException retriable
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
index 47c0676979b94ea291532e5b713aee0bbcbece56..cd81a4e5444ca9b47a1f51a94ff79796afa106a2 100644 (file)
@@ -13,9 +13,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -26,6 +27,7 @@ import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -156,32 +158,25 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     public abstract Optional<T> getBackendInfo();
 
-    final Iterable<ConnectionEntry> startReplay() {
+    final Collection<ConnectionEntry> startReplay() {
         lock.lock();
-        return queue.asIterable();
+        return queue.drain();
     }
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder);
+        setForwarder(forwarder);
         lock.unlock();
     }
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        final long now = currentTime();
-        final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
-        while (it.hasNext()) {
-            final ConnectionEntry e = it.next();
-            forwarder.forwardEntry(e, now);
-            it.remove();
-        }
-
-        queue.setForwarder(forwarder);
+        queue.setForwarder(forwarder, currentTime());
     }
 
     @GuardedBy("lock")
-    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
+    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
+            RequestException runtimeRequestException);
 
     final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
@@ -201,10 +196,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
-    final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
+    final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current, final RequestException cause) {
         lock.lock();
         try {
-            return lockedReconnect(current);
+            return lockedReconnect(current, cause);
         } finally {
             lock.unlock();
         }
@@ -269,7 +264,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             delay = lockedCheckTimeout(now);
             if (delay == null) {
                 // We have timed out. There is no point in scheduling a timer
-                return lockedReconnect(current);
+                return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+                    new TimeoutException()));
             }
 
             if (delay.isPresent()) {
@@ -347,10 +343,14 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     private void lockedPoison(final RequestException cause) {
-        poisoned = cause;
+        poisoned = enrichPoison(cause);
         queue.poison(cause);
     }
 
+    RequestException enrichPoison(final RequestException ex) {
+        return ex;
+    }
+
     @VisibleForTesting
     final RequestException poisoned() {
         return poisoned;