Slice front-end request messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index d9483d7b2b61915295eef094fc8c8570f0b63eaa..ebe5498481d9fe3befdabafbb4976f635a7b15b3 100644 (file)
@@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -36,6 +38,7 @@ import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
@@ -81,6 +84,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -190,6 +194,8 @@ public class Shard extends RaftActor {
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
 
+    private final MessageAssembler requestMessageAssembler;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -246,6 +252,11 @@ public class Shard extends RaftActor {
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+                .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+                .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
     }
 
     private void setTransactionCommitTimeout() {
@@ -300,6 +311,8 @@ public class Shard extends RaftActor {
 
             if (message instanceof RequestEnvelope) {
                 handleRequestEnvelope((RequestEnvelope)message);
+            } else if (requestMessageAssembler.isHandledMessage(message)) {
+                handleRequestAssemblerMessage(message);
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
             } else if (CreateTransaction.isSerializedType(message)) {
@@ -328,8 +341,7 @@ public class Shard extends RaftActor {
                 PeerAddressResolved resolved = (PeerAddressResolved) message;
                 setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
             } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
-                store.checkForExpiredTransactions(transactionCommitTimeout);
-                commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+                commitTimeoutCheck();
             } else if (message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
             } else if (message instanceof RegisterRoleChangeListener) {
@@ -361,6 +373,13 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void handleRequestAssemblerMessage(Object message) {
+        dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+            JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+            requestMessageAssembler.handleMessage(message, self());
+        });
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleRequestEnvelope(final RequestEnvelope envelope) {
         final long now = ticker().read();
@@ -390,6 +409,30 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void commitTimeoutCheck() {
+        store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+        commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+        requestMessageAssembler.checkExpiredAssembledMessageState();
+    }
+
+    private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+        final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+        final LeaderFrontendState state = knownFrontends.get(frontend);
+        if (state == null) {
+            // Not tell-based protocol, do nothing
+            return Optional.absent();
+        }
+
+        if (isIsolatedLeader()) {
+            // We are isolated and no new request can come through until we emerge from it. We are still updating
+            // liveness of frontend when we see it attempting to communicate. Use the last access timer.
+            return Optional.of(state.getLastSeenTicks());
+        }
+
+        // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+        return Optional.of(state.getLastConnectTicks());
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
@@ -417,11 +460,13 @@ public class Shard extends RaftActor {
     }
 
     // Acquire our frontend tracking handle and verify generation matches
-    private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+    @Nullable
+    private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
         if (existing != null) {
             final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
             if (cmp == 0) {
+                existing.touch();
                 return existing;
             }
             if (cmp > 0) {
@@ -436,10 +481,17 @@ public class Shard extends RaftActor {
             LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
         }
 
-        final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
-        knownFrontends.put(clientId.getFrontendId(), ret);
-        LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
-        return ret;
+        return null;
+    }
+
+    private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+        final LeaderFrontendState ret = findFrontend(clientId);
+        if (ret != null) {
+            return ret;
+        }
+
+        // TODO: a dedicated exception would be better, but this is technically true, too
+        throw new OutOfSequenceEnvelopeException(0);
     }
 
     private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
@@ -458,6 +510,12 @@ public class Shard extends RaftActor {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleConnectClient(final ConnectClientRequest message) {
         try {
+            final ClientIdentifier clientId = message.getTarget();
+            final LeaderFrontendState existing = findFrontend(clientId);
+            if (existing != null) {
+                existing.touch();
+            }
+
             if (!isLeader() || !isLeaderActive()) {
                 LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
                                 + "isLeadershipTransferInProgress: {}.",
@@ -466,7 +524,15 @@ public class Shard extends RaftActor {
             }
 
             final ABIVersion selectedVersion = selectVersion(message);
-            final LeaderFrontendState frontend = getFrontend(message.getTarget());
+            final LeaderFrontendState frontend;
+            if (existing == null) {
+                frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+                knownFrontends.put(clientId.getFrontendId(), frontend);
+                LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
+            } else {
+                frontend = existing;
+            }
+
             frontend.reconnect();
             message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
                 ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
@@ -835,6 +901,8 @@ public class Shard extends RaftActor {
                 knownFrontends = ImmutableMap.of();
             }
 
+            requestMessageAssembler.close();
+
             if (!hasLeader()) {
                 // No leader anywhere, nothing else to do
                 return;