Slice front-end request messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index c003d901af800c6df9505e25a52b3501c7a0e4ee..ebe5498481d9fe3befdabafbb4976f635a7b15b3 100644 (file)
@@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -82,6 +84,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -191,6 +194,8 @@ public class Shard extends RaftActor {
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
 
+    private final MessageAssembler requestMessageAssembler;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -247,6 +252,11 @@ public class Shard extends RaftActor {
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+                .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+                .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
     }
 
     private void setTransactionCommitTimeout() {
@@ -301,6 +311,8 @@ public class Shard extends RaftActor {
 
             if (message instanceof RequestEnvelope) {
                 handleRequestEnvelope((RequestEnvelope)message);
+            } else if (requestMessageAssembler.isHandledMessage(message)) {
+                handleRequestAssemblerMessage(message);
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
             } else if (CreateTransaction.isSerializedType(message)) {
@@ -361,6 +373,13 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void handleRequestAssemblerMessage(Object message) {
+        dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+            JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+            requestMessageAssembler.handleMessage(message, self());
+        });
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleRequestEnvelope(final RequestEnvelope envelope) {
         final long now = ticker().read();
@@ -393,6 +412,7 @@ public class Shard extends RaftActor {
     private void commitTimeoutCheck() {
         store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
         commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+        requestMessageAssembler.checkExpiredAssembledMessageState();
     }
 
     private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
@@ -881,6 +901,8 @@ public class Shard extends RaftActor {
                 knownFrontends = ImmutableMap.of();
             }
 
+            requestMessageAssembler.close();
+
             if (!hasLeader()) {
                 // No leader anywhere, nothing else to do
                 return;