+ this.resolver = requireNonNull(resolver);
+
+ final ClientActorConfig config = context.config();
+ responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
+ .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+ config.getTempFileDirectory()))
+ .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
+
+ staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> {
+ context().executeInActor(behavior -> {
+ LOG.debug("BackendInfo for shard {} is now stale", shard);
+ final AbstractClientConnection<T> conn = connections.get(shard);
+ if (conn instanceof ConnectedClientConnection) {
+ conn.reconnect(this, new BackendStaleException(shard));
+ }
+ return behavior;
+ });
+ });