import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.slf4j.Logger;
private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
+ private final MessageAssembler responseMessageAssembler;
protected ClientActorBehavior(@Nonnull final ClientActorContext context,
@Nonnull final BackendInfoResolver<T> resolver) {
super(context);
this.resolver = Preconditions.checkNotNull(resolver);
+
+ final ClientActorConfig config = context.config();
+ responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
+ .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+ config.getTempFileDirectory()))
+ .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
}
@Override
return context().getIdentifier();
}
+ @Override
+ public void close() {
+ responseMessageAssembler.close();
+ }
+
/**
* Get a connection to a shard.
*
if (command instanceof InternalCommand) {
return ((InternalCommand<T>) command).execute(this);
}
+
if (command instanceof SuccessEnvelope) {
return onRequestSuccess((SuccessEnvelope) command);
}
+
if (command instanceof FailureEnvelope) {
return internalOnRequestFailure((FailureEnvelope) command);
}
+ if (MessageAssembler.isHandledMessage(command)) {
+ context().dispatchers().getDispatcher(DispatcherType.Serialization).execute(
+ () -> responseMessageAssembler.handleMessage(command, context().self()));
+ return this;
+ }
+
return onCommand(command);
}