BUG-5280: handle NotLeaderException 47/56047/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 14 Apr 2017 18:45:20 +0000 (20:45 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 27 Apr 2017 09:13:46 +0000 (09:13 +0000)
NotLeaderException is indicative of leader movement, in which
case we need to tear down the connection and resolve the new
leader.

Change-Id: I068e97f9a7feb75cc30afb5f5449f0adf00aa217
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8265c26f7692086677fa943976824966f32eecf6)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java

index 69deb01..28d8a1b 100644 (file)
@@ -124,7 +124,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     @GuardedBy("lock")
-    abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
+    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
 
     private long readTime() {
         return context.ticker().read();
@@ -152,6 +152,15 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
+    final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
+        lock.lock();
+        try {
+            return lockedReconnect(current);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /**
      * Schedule a timer to fire on the actor thread after a delay.
      *
@@ -204,7 +213,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             delay = lockedCheckTimeout(now);
             if (delay == null) {
                 // We have timed out. There is no point in scheduling a timer
-                return reconnectConnection(current);
+                return lockedReconnect(current);
             }
 
             if (delay.isPresent()) {
index 896d85b..5233c07 100644 (file)
@@ -17,6 +17,7 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -105,6 +106,11 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         }
     }
 
+    private AbstractClientConnection<T> getConnection(final ResponseEnvelope<?> response) {
+        // Always called from actor context: no locking required
+        return connections.get(extractCookie(response.getMessage().getTarget()));
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     final ClientActorBehavior<T> onReceiveCommand(final Object command) {
@@ -132,8 +138,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     }
 
     private void onResponse(final ResponseEnvelope<?> response) {
-        final long cookie = extractCookie(response.getMessage().getTarget());
-        final AbstractClientConnection<T> connection = connections.get(cookie);
+        final AbstractClientConnection<T> connection = getConnection(response);
         if (connection != null) {
             connection.receiveResponse(response);
         } else {
@@ -160,6 +165,16 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             poison(cause);
             return null;
         }
+        if (cause instanceof NotLeaderException) {
+            final AbstractClientConnection<T> conn = getConnection(command);
+            if (conn instanceof ReconnectingClientConnection) {
+                // Already reconnecting, do not churn the logs
+                return this;
+            } else if (conn != null) {
+                LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause);
+                return conn.reconnect(this);
+            }
+        }
 
         return onRequestFailure(command);
     }
index 9198d8f..cade27f 100644 (file)
@@ -18,7 +18,7 @@ public final class ConnectedClientConnection<T extends BackendInfo> extends Abst
     }
 
     @Override
-    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
         final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this);
         setForwarder(new SimpleReconnectForwarder(next));
         current.reconnectConnection(this, next);
index aa98687..12c520b 100644 (file)
@@ -30,7 +30,7 @@ public final class ConnectingClientConnection<T extends BackendInfo> extends Abs
     }
 
     @Override
-    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
         throw new UnsupportedOperationException("Attempted to reconnect a connecting connection");
     }
 }
index a67b7ed..0aac7f4 100644 (file)
@@ -25,7 +25,7 @@ public final class ReconnectingClientConnection<T extends BackendInfo> extends A
     }
 
     @Override
-    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
         // Intentional no-op
         LOG.debug("Skipping reconnect of already-reconnecting connection {}", this);
         return current;
index 60cdc3a..2c1159e 100644 (file)
@@ -28,7 +28,7 @@ public class ConnectedClientConnectionTest
     @Test
     public void testReconnectConnection() throws Exception {
         final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
-        connection.reconnectConnection(behavior);
+        connection.lockedReconnect(behavior);
         verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class));
     }
 
index b2ff723..142e948 100644 (file)
@@ -41,7 +41,7 @@ public class ReconnectingClientConnectionTest
     @Test
     public void testReconnectConnection() throws Exception {
         final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
-        Assert.assertSame(behavior, connection.reconnectConnection(behavior));
+        Assert.assertSame(behavior, connection.lockedReconnect(behavior));
     }
 
     @Override

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.