BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index fbd5b6456a79ae7c94a140008fda80db952d1dbc..318a4e68e793e01c01cc0316099f8134d2aebec1 100644 (file)
@@ -98,6 +98,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -180,6 +181,7 @@ public class Shard extends RaftActor {
 
     private final FrontendMetadata frontendMetadata;
     private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+    private boolean paused;
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -431,7 +433,9 @@ public class Shard extends RaftActor {
     private void handleConnectClient(final ConnectClientRequest message) {
         try {
             if (!isLeader() || !isLeaderActive()) {
-                LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+                LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+                                + "isLeadershipTransferInProgress: {}.",
+                        persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
                 throw new NotLeaderException(getSelf());
             }
 
@@ -449,8 +453,10 @@ public class Shard extends RaftActor {
     private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
-        if (!isLeader() || !isLeaderActive()) {
-            LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+        if (!isLeader() || paused || !isLeaderActive()) {
+            LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+                            + "isLeadershipTransferInProgress: {}, paused: {}",
+                    persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
             throw new NotLeaderException(getSelf());
         }
 
@@ -783,6 +789,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
+            paused = false;
             store.purgeLeaderState();
         }
 
@@ -794,19 +801,19 @@ public class Shard extends RaftActor {
     @Override
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
+        paused = false;
 
-        final boolean hasLeader = hasLeader();
-        if (!hasLeader) {
-            // No leader implies we are not the leader, lose frontend state if we have any. This also places
-            // an explicit guard so the map will not get modified accidentally.
+        if (!isLeader()) {
             if (!knownFrontends.isEmpty()) {
                 LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
                 knownFrontends = ImmutableMap.of();
             }
-            return;
-        }
 
-        if (!isLeader()) {
+            if (!hasLeader()) {
+                // No leader anywhere, nothing else to do
+                return;
+            }
+
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
@@ -849,9 +856,26 @@ public class Shard extends RaftActor {
     @Override
     protected void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+        paused = true;
+
+        // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+        knownFrontends.values().forEach(LeaderFrontendState::retire);
+        knownFrontends = ImmutableMap.of();
+
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
+    @Override
+    protected void unpauseLeader() {
+        LOG.debug("{}: In unpauseLeader", persistenceId());
+        paused = false;
+
+        store.setRunOnPendingTransactionsComplete(null);
+
+        // Restore tell-based protocol state as if we were becoming the leader
+        knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+    }
+
     @Override
     protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
         return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
@@ -892,7 +916,7 @@ public class Shard extends RaftActor {
         private ShardIdentifier id;
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
-        private SchemaContext schemaContext;
+        private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private TipProducingDataTree dataTree;
         private volatile boolean sealed;
@@ -928,9 +952,9 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T schemaContext(final SchemaContext newSchemaContext) {
+        public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
             checkSealed();
-            this.schemaContext = newSchemaContext;
+            this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
             return self();
         }
 
@@ -959,7 +983,7 @@ public class Shard extends RaftActor {
         }
 
         public SchemaContext getSchemaContext() {
-            return schemaContext;
+            return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
@@ -986,7 +1010,7 @@ public class Shard extends RaftActor {
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
             Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
-            Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+            Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
         }
 
         public Props props() {