X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;fp=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=ddf1dc190b8ed1a4ea3c70b2a1386a078f426053;hp=fa2e3b76d8a038497d57efd7e344498862717d06;hb=1819f12a7e562482d9328a8e4c4aeffd2b1a2c01;hpb=6e21e18b47cfca090d452eec30ec403809139ada;ds=sidebyside diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index fa2e3b76d8..ddf1dc190b 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -83,6 +84,7 @@ public abstract class ClientActorBehavior extends private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; private final MessageAssembler responseMessageAssembler; + private final Registration staleBackendInfoReg; protected ClientActorBehavior(@Nonnull final ClientActorContext context, @Nonnull final BackendInfoResolver resolver) { @@ -94,6 +96,17 @@ public abstract class ClientActorBehavior extends .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), config.getTempFileDirectory())) .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); + + staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> { + context().executeInActor(behavior -> { + LOG.debug("BackendInfo for shard {} is now stale", shard); + final AbstractClientConnection conn = connections.get(shard); + if (conn instanceof ConnectedClientConnection) { + conn.reconnect(this, new BackendStaleException(shard)); + } + return behavior; + }); + }); } @Override @@ -104,7 +117,9 @@ public abstract class ClientActorBehavior extends @Override public void close() { + super.close(); responseMessageAssembler.close(); + staleBackendInfoReg.close(); } /** @@ -437,4 +452,17 @@ public abstract class ClientActorBehavior extends return behavior; })); } + + private static class BackendStaleException extends RequestException { + private static final long serialVersionUID = 1L; + + BackendStaleException(final Long shard) { + super("Backend for shard " + shard + " is stale"); + } + + @Override + public boolean isRetriable() { + return false; + } + } }