Adjust to yangtools-2.0.0 changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 768b19fa3e7f31b971142bcd4c49968200c36124..d7bbc1ca3c19a7ffb59f8424635fcfc153ecd0cc 100644 (file)
@@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -58,6 +60,7 @@ import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -82,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -99,8 +103,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
@@ -160,6 +164,8 @@ public class Shard extends RaftActor {
 
     private final ShardStats shardMBean;
 
+    private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
+
     private DatastoreContext datastoreContext;
 
     private final ShardCommitCoordinator commitCoordinator;
@@ -191,6 +197,8 @@ public class Shard extends RaftActor {
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
 
+    private final MessageAssembler requestMessageAssembler;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -247,6 +255,15 @@ public class Shard extends RaftActor {
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+                .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+                .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
+
+        listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(),
+                self());
+        listenerInfoMXBean.register();
     }
 
     private void setTransactionCommitTimeout() {
@@ -275,6 +292,7 @@ public class Shard extends RaftActor {
         commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
 
         shardMBean.unregisterMBean();
+        listenerInfoMXBean.unregister();
     }
 
     @Override
@@ -301,6 +319,8 @@ public class Shard extends RaftActor {
 
             if (message instanceof RequestEnvelope) {
                 handleRequestEnvelope((RequestEnvelope)message);
+            } else if (MessageAssembler.isHandledMessage(message)) {
+                handleRequestAssemblerMessage(message);
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
             } else if (CreateTransaction.isSerializedType(message)) {
@@ -329,8 +349,7 @@ public class Shard extends RaftActor {
                 PeerAddressResolved resolved = (PeerAddressResolved) message;
                 setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
             } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
-                store.checkForExpiredTransactions(transactionCommitTimeout);
-                commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+                commitTimeoutCheck();
             } else if (message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
             } else if (message instanceof RegisterRoleChangeListener) {
@@ -362,6 +381,13 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void handleRequestAssemblerMessage(final Object message) {
+        dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+            JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+            requestMessageAssembler.handleMessage(message, self());
+        });
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleRequestEnvelope(final RequestEnvelope envelope) {
         final long now = ticker().read();
@@ -391,6 +417,30 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void commitTimeoutCheck() {
+        store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+        commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+        requestMessageAssembler.checkExpiredAssembledMessageState();
+    }
+
+    private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+        final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+        final LeaderFrontendState state = knownFrontends.get(frontend);
+        if (state == null) {
+            // Not tell-based protocol, do nothing
+            return Optional.absent();
+        }
+
+        if (isIsolatedLeader()) {
+            // We are isolated and no new request can come through until we emerge from it. We are still updating
+            // liveness of frontend when we see it attempting to communicate. Use the last access timer.
+            return Optional.of(state.getLastSeenTicks());
+        }
+
+        // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+        return Optional.of(state.getLastConnectTicks());
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
@@ -452,7 +502,8 @@ public class Shard extends RaftActor {
         throw new OutOfSequenceEnvelopeException(0);
     }
 
-    private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+    @Nonnull
+    private static ABIVersion selectVersion(final ConnectClientRequest message) {
         final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
         for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
             if (clientRange.contains(v)) {
@@ -500,7 +551,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+    @Nullable
+    private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || paused || !isLeaderActive()) {
@@ -633,7 +685,7 @@ public class Shard extends RaftActor {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
-                        "Could not commit transaction " + batched.getTransactionId());
+                        "Could not process BatchedModifications " + batched.getTransactionId());
             } else {
                 // If this is not the first batch and leadership changed in between batched messages,
                 // we need to reconstruct previous BatchedModifications from the transaction
@@ -686,7 +738,7 @@ public class Shard extends RaftActor {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
-                        "Could not commit transaction " + message.getTransactionId());
+                        "Could not process ready local transaction " + message.getTransactionId());
             } else {
                 LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
@@ -705,7 +757,7 @@ public class Shard extends RaftActor {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
-                        "Could not commit transaction " + forwardedReady.getTransactionId());
+                        "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
             } else {
                 LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
 
@@ -789,8 +841,11 @@ public class Shard extends RaftActor {
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-        return new ShardRecoveryCoordinator(store,
-            restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
+        if (restoreFromSnapshot == null) {
+            return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
+        }
+
+        return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot());
     }
 
     @Override
@@ -859,6 +914,8 @@ public class Shard extends RaftActor {
                 knownFrontends = ImmutableMap.of();
             }
 
+            requestMessageAssembler.close();
+
             if (!hasLeader()) {
                 // No leader anywhere, nothing else to do
                 return;
@@ -927,7 +984,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
         return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
                 .dataChangeListenerActors(changeSupport.getListenerActors())
                 .commitCohortActors(store.getCohortActors());
@@ -968,7 +1025,7 @@ public class Shard extends RaftActor {
         private DatastoreContext datastoreContext;
         private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
-        private TipProducingDataTree dataTree;
+        private DataTree dataTree;
         private volatile boolean sealed;
 
         protected AbstractBuilder(final Class<S> shardClass) {
@@ -1002,9 +1059,9 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
+        public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
             checkSealed();
-            this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
+            this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
             return self();
         }
 
@@ -1014,7 +1071,7 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T dataTree(final TipProducingDataTree newDataTree) {
+        public T dataTree(final DataTree newDataTree) {
             checkSealed();
             this.dataTree = newDataTree;
             return self();
@@ -1040,7 +1097,7 @@ public class Shard extends RaftActor {
             return restoreFromSnapshot;
         }
 
-        public TipProducingDataTree getDataTree() {
+        public DataTree getDataTree() {
             return dataTree;
         }
 
@@ -1071,7 +1128,7 @@ public class Shard extends RaftActor {
     }
 
     public static class Builder extends AbstractBuilder<Builder, Shard> {
-        private Builder() {
+        Builder() {
             super(Shard.class);
         }
     }