import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
private final MessageAssembler responseMessageAssembler;
+ private final Registration staleBackendInfoReg;
protected ClientActorBehavior(@Nonnull final ClientActorContext context,
@Nonnull final BackendInfoResolver<T> resolver) {
.fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
config.getTempFileDirectory()))
.assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
+
+ staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> {
+ context().executeInActor(behavior -> {
+ LOG.debug("BackendInfo for shard {} is now stale", shard);
+ final AbstractClientConnection<T> conn = connections.get(shard);
+ if (conn instanceof ConnectedClientConnection) {
+ conn.reconnect(this, new BackendStaleException(shard));
+ }
+ return behavior;
+ });
+ });
}
@Override
@Override
public void close() {
+ super.close();
responseMessageAssembler.close();
+ staleBackendInfoReg.close();
}
/**
return behavior;
}));
}
+
+ private static class BackendStaleException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ BackendStaleException(final Long shard) {
+ super("Backend for shard " + shard + " is stale");
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+ }
}