tell-based - reconnect on leader change
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
index 3f6515cbb8b20ce22183b857eccacd6acb50223c..b5e21b6ddd625481ef2a3950f4a986786b96e9fa 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -83,6 +84,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     private final InversibleLock connectionsLock = new InversibleLock();
     private final BackendInfoResolver<T> resolver;
     private final MessageAssembler responseMessageAssembler;
+    private final Registration staleBackendInfoReg;
 
     protected ClientActorBehavior(@Nonnull final ClientActorContext context,
             @Nonnull final BackendInfoResolver<T> resolver) {
@@ -94,6 +96,17 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
                         config.getTempFileDirectory()))
                 .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
+
+        staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> {
+            context().executeInActor(behavior -> {
+                LOG.debug("BackendInfo for shard {} is now stale", shard);
+                final AbstractClientConnection<T> conn = connections.get(shard);
+                if (conn instanceof ConnectedClientConnection) {
+                    conn.reconnect(this, new BackendStaleException(shard));
+                }
+                return behavior;
+            });
+        });
     }
 
     @Override
@@ -104,7 +117,9 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
 
     @Override
     public void close() {
+        super.close();
         responseMessageAssembler.close();
+        staleBackendInfoReg.close();
     }
 
     /**
@@ -437,4 +452,17 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             return behavior;
         }));
     }
+
+    private static class BackendStaleException extends RequestException {
+        private static final long serialVersionUID = 1L;
+
+        BackendStaleException(final Long shard) {
+            super("Backend for shard " + shard + " is stale");
+        }
+
+        @Override
+        public boolean isRetriable() {
+            return false;
+        }
+    }
 }