tell-based - reconnect on leader change
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
index fa2e3b76d8a038497d57efd7e344498862717d06..ddf1dc190b8ed1a4ea3c70b2a1386a078f426053 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -83,6 +84,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     private final InversibleLock connectionsLock = new InversibleLock();
     private final BackendInfoResolver<T> resolver;
     private final MessageAssembler responseMessageAssembler;
+    private final Registration staleBackendInfoReg;
 
     protected ClientActorBehavior(@Nonnull final ClientActorContext context,
             @Nonnull final BackendInfoResolver<T> resolver) {
@@ -94,6 +96,17 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
                         config.getTempFileDirectory()))
                 .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
+
+        staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> {
+            context().executeInActor(behavior -> {
+                LOG.debug("BackendInfo for shard {} is now stale", shard);
+                final AbstractClientConnection<T> conn = connections.get(shard);
+                if (conn instanceof ConnectedClientConnection) {
+                    conn.reconnect(this, new BackendStaleException(shard));
+                }
+                return behavior;
+            });
+        });
     }
 
     @Override
@@ -104,7 +117,9 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
 
     @Override
     public void close() {
+        super.close();
         responseMessageAssembler.close();
+        staleBackendInfoReg.close();
     }
 
     /**
@@ -437,4 +452,17 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             return behavior;
         }));
     }
+
+    private static class BackendStaleException extends RequestException {
+        private static final long serialVersionUID = 1L;
+
+        BackendStaleException(final Long shard) {
+            super("Backend for shard " + shard + " is stale");
+        }
+
+        @Override
+        public boolean isRetriable() {
+            return false;
+        }
+    }
 }