BUG-5280: handle NotLeaderException 64/55064/4
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 14 Apr 2017 18:45:20 +0000 (20:45 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 17 Apr 2017 17:56:23 +0000 (17:56 +0000)
NotLeaderException is indicative of leader movement, in which
case we need to tear down the connection and resolve the new
leader.

Change-Id: I068e97f9a7feb75cc30afb5f5449f0adf00aa217
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java

index 69deb01910d7ee27c235e2ae9b291418f1b02360..28d8a1b42228690177ae9498e7c77dcf518558ce 100644 (file)
@@ -124,7 +124,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     @GuardedBy("lock")
-    abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
+    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
 
     private long readTime() {
         return context.ticker().read();
@@ -152,6 +152,15 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
+    final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
+        lock.lock();
+        try {
+            return lockedReconnect(current);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /**
      * Schedule a timer to fire on the actor thread after a delay.
      *
@@ -204,7 +213,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             delay = lockedCheckTimeout(now);
             if (delay == null) {
                 // We have timed out. There is no point in scheduling a timer
-                return reconnectConnection(current);
+                return lockedReconnect(current);
             }
 
             if (delay.isPresent()) {
index 896d85b713fcecfb611cc0d23987a973125718d1..5233c0795aee29c8c858c1cb0fd4be44d5ac13ca 100644 (file)
@@ -17,6 +17,7 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -105,6 +106,11 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         }
     }
 
+    private AbstractClientConnection<T> getConnection(final ResponseEnvelope<?> response) {
+        // Always called from actor context: no locking required
+        return connections.get(extractCookie(response.getMessage().getTarget()));
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     final ClientActorBehavior<T> onReceiveCommand(final Object command) {
@@ -132,8 +138,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     }
 
     private void onResponse(final ResponseEnvelope<?> response) {
-        final long cookie = extractCookie(response.getMessage().getTarget());
-        final AbstractClientConnection<T> connection = connections.get(cookie);
+        final AbstractClientConnection<T> connection = getConnection(response);
         if (connection != null) {
             connection.receiveResponse(response);
         } else {
@@ -160,6 +165,16 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             poison(cause);
             return null;
         }
+        if (cause instanceof NotLeaderException) {
+            final AbstractClientConnection<T> conn = getConnection(command);
+            if (conn instanceof ReconnectingClientConnection) {
+                // Already reconnecting, do not churn the logs
+                return this;
+            } else if (conn != null) {
+                LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause);
+                return conn.reconnect(this);
+            }
+        }
 
         return onRequestFailure(command);
     }
index 9198d8fe12c66980802917085bc4c548b8b5e484..cade27f77d6070c58df5cb80e0c9a920652cbf15 100644 (file)
@@ -18,7 +18,7 @@ public final class ConnectedClientConnection<T extends BackendInfo> extends Abst
     }
 
     @Override
-    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
         final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this);
         setForwarder(new SimpleReconnectForwarder(next));
         current.reconnectConnection(this, next);
index aa986873d57e03f841e27986d24f57fa5e9cf402..12c520bb17f2af39bbb845660eda161a03ba1585 100644 (file)
@@ -30,7 +30,7 @@ public final class ConnectingClientConnection<T extends BackendInfo> extends Abs
     }
 
     @Override
-    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
         throw new UnsupportedOperationException("Attempted to reconnect a connecting connection");
     }
 }
index a67b7ed3be27bdd629f3105ad62d7650359f5d3a..0aac7f46637fc49716ca094ce46bc42baca1a4b0 100644 (file)
@@ -25,7 +25,7 @@ public final class ReconnectingClientConnection<T extends BackendInfo> extends A
     }
 
     @Override
-    ClientActorBehavior<T> reconnectConnection(final ClientActorBehavior<T> current) {
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
         // Intentional no-op
         LOG.debug("Skipping reconnect of already-reconnecting connection {}", this);
         return current;
index 60cdc3acc83ba365ac9cedd64554320eea9b666f..2c1159ef93977dc2efd223da6ac006c9b261885d 100644 (file)
@@ -28,7 +28,7 @@ public class ConnectedClientConnectionTest
     @Test
     public void testReconnectConnection() throws Exception {
         final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
-        connection.reconnectConnection(behavior);
+        connection.lockedReconnect(behavior);
         verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class));
     }
 
index b2ff72368c3f5e37b707a438b36d24e85079f489..142e94882059c0954f8734348c84803a1f585047 100644 (file)
@@ -41,7 +41,7 @@ public class ReconnectingClientConnectionTest
     @Test
     public void testReconnectConnection() throws Exception {
         final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
-        Assert.assertSame(behavior, connection.reconnectConnection(behavior));
+        Assert.assertSame(behavior, connection.lockedReconnect(behavior));
     }
 
     @Override