import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
private final MessageSlicer responseMessageSlicer;
private final Dispatchers dispatchers;
+ private final MessageAssembler requestMessageAssembler;
+
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+ requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+ .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+ .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+ .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
}
private void setTransactionCommitTimeout() {
if (message instanceof RequestEnvelope) {
handleRequestEnvelope((RequestEnvelope)message);
+ } else if (requestMessageAssembler.isHandledMessage(message)) {
+ handleRequestAssemblerMessage(message);
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
} else if (CreateTransaction.isSerializedType(message)) {
}
}
+ private void handleRequestAssemblerMessage(Object message) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+ requestMessageAssembler.handleMessage(message, self());
+ });
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleRequestEnvelope(final RequestEnvelope envelope) {
final long now = ticker().read();
private void commitTimeoutCheck() {
store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ requestMessageAssembler.checkExpiredAssembledMessageState();
}
private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
knownFrontends = ImmutableMap.of();
}
+ requestMessageAssembler.close();
+
if (!hasLeader()) {
// No leader anywhere, nothing else to do
return;