Leader should always apply modifications as local 76/82576/24
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 18 Jun 2019 12:54:56 +0000 (14:54 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 19 Jan 2020 13:27:25 +0000 (14:27 +0100)
Normally an entry application is as follows:
1. leader sends an append entry off to persistence and replicates it to
   followers
2. leaders creates its ClientRequestTracker
3. when the entry is done with persistence and replication leader moves its
   commit index
4. part of moving the commit index is sending an ApplyState message which
   finalizes the entry application in the DataTree
5. The ApplyState determines if a ClientRequestTracker is present and adds
   an identifier to the ApplyState message if it is. This determines the way
   in which the finalize of the entry application happens in the DataTree.
   If it is present the entry is applied as if it originated on the leader,
   if it is not present it is applied as if the node is a follower.

The problem is when the leader flaps in a leader -> follower -> leader
transition after 2. and before 4..

This would mean that the new leader no longer has the ClientRequestTracker
which was created in the previous leader state, which means that when it
starts with 5. it will create the ApplyState without an identifier
and the entry finishes up the application as if the node is a follower.

This means that it will be applied without finishCommit which means that
the transaction will be forever stuck in COMMIT_PENDING state until
the node would be restarted.

Change this up, so that the leader will apply modifications as local, even
when it looses its ClientRequestTracker and add Identifiable to payloads
which require it.

Since this code path should never occur when we are candidate, catch this
transition. As ClientRequestTracker becomes an optional entity, we hide that
as well.

JIRA: CONTROLLER-1927
Change-Id: I636f998cd62ec82ef02193261624e4a51275fb86
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java

index 32e2e09c3fe2b13482a4f02a2a460e5ef45643dd..6560ad76c3937285300173f16df53f4c60d9ae1d 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -55,6 +56,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -433,8 +436,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
-    @Override
-    protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
+    /**
+     * Removes and returns the ClientRequestTracker for the specified log index.
+     * @param logIndex the log index
+     * @return the ClientRequestTracker or null if none available
+     */
+    private ClientRequestTracker removeClientRequestTracker(final long logIndex) {
         final Iterator<ClientRequestTracker> it = trackers.iterator();
         while (it.hasNext()) {
             final ClientRequestTracker t = it.next();
@@ -447,6 +454,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return null;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        // first check whether a ClientRequestTracker exists for this entry.
+        // If it does that means the leader wasn't dropped before the transaction applied.
+        // That means that this transaction can be safely applied as a local transaction since we
+        // have the ClientRequestTracker.
+        final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex());
+        if (tracker != null) {
+            return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry);
+        }
+
+        // Tracker is missing, this means that we switched behaviours between replicate and applystate
+        // and became the leader again,. We still want to apply this as a local modification because
+        // we have resumed leadership with that log entry having been committed.
+        final Payload payload = entry.getData();
+        if (payload instanceof IdentifiablePayload) {
+            return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+        }
+
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
         return this;
index 3440de9dd9bc808c232bbcff54808940aa83667e..fd2fbd332c7a58bab6f60b01e37b2193ad98c3e7 100644 (file)
@@ -18,7 +18,6 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -324,15 +323,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return context.getReplicatedLog().lastIndex();
     }
 
-    /**
-     * Removes and returns the ClientRequestTracker for the specified log index.
-     * @param logIndex the log index
-     * @return the ClientRequestTracker or null if none available
-     */
-    protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
-        return null;
-    }
-
     /**
      * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
      *
@@ -397,13 +387,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
 
-                final ApplyState applyState;
-                final ClientRequestTracker tracker = removeClientRequestTracker(i);
-                if (tracker != null) {
-                    applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
-                } else {
-                    applyState = new ApplyState(null, null, replicatedLogEntry);
-                }
+                final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
 
                 log.debug("{}: Setting last applied to {}", logName(), i);
 
@@ -425,6 +409,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
     }
 
+    /**
+     * Create an ApplyState message for a particular log entry so we can determine how to apply this entry.
+     *
+     * @param entry the log entry
+     * @return ApplyState for this entry
+     */
+    abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry);
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof AppendEntries) {
index afa46892bea33754340f7ef6891f8cdbe646ed61..a8762ec76edb11e755731379960a7014f215a598 100644 (file)
@@ -15,6 +15,8 @@ import java.util.Collection;
 import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -50,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     private final Collection<String> votingPeers = new ArrayList<>();
 
-    public Candidate(RaftActorContext context) {
+    public Candidate(final RaftActorContext context) {
         super(context, RaftState.Candidate);
 
         for (PeerInfo peer: context.getPeers()) {
@@ -83,7 +85,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
+    protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
 
         log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
@@ -99,12 +101,13 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+    protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+            final AppendEntriesReply appendEntriesReply) {
         return this;
     }
 
     @Override
-    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+    protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
         log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
@@ -129,8 +132,14 @@ public class Candidate extends AbstractRaftActorBehavior {
         return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
     }
 
+
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        throw new IllegalStateException("A candidate should never attempt to apply " + entry);
+    }
+
     @Override
-    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+    public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout) {
             log.debug("{}: Received ElectionTimeout", logName());
 
@@ -178,10 +187,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-
     private void startNewTerm() {
-
-
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
index d88c30db333bcf040b20febc76884c9bc13e9147..b642ee43a563588f7a95c08e4b97495b94526186 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -435,6 +436,11 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java
new file mode 100644 (file)
index 0000000..a323fba
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+public abstract class IdentifiablePayload<T extends Identifier> extends Payload implements Identifiable<T> {
+}
index b970ba4485bb759707cca1f0bb1bfbe3db82eeb2..fc65743e7bb6c53a2b42623495b0675db5577139 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
index 4ca35098c5b4d6189adf006ec3444de689397001..32e155bf03fc50ea89dc0932c25c51797d76af87 100644 (file)
@@ -18,6 +18,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -27,8 +28,8 @@ import org.opendaylight.yangtools.concepts.Identifier;
  *
  * @author Robert Varga
  */
-public abstract class AbstractIdentifiablePayload<T extends Identifier>
-        extends Payload implements Identifiable<T>, Serializable {
+public abstract class AbstractIdentifiablePayload<T extends Identifier> extends IdentifiablePayload<T>
+        implements Serializable {
     protected abstract static class AbstractProxy<T extends Identifier> implements Externalizable {
         private static final long serialVersionUID = 1L;
         private byte[] serialized;
index 73bdd6f31baa98539d1c312a648732d4f1dd316e..b2164133b5b1209c53484852414bf88bc84a43a1 100644 (file)
@@ -27,7 +27,7 @@ import java.io.StreamCorruptedException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Variant;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -42,10 +42,13 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 @Beta
-public abstract class CommitTransactionPayload extends Payload implements Serializable {
+public abstract class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
+        implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
+    private volatile Entry<TransactionIdentifier, DataTreeCandidate> candidate = null;
+
     CommitTransactionPayload() {
 
     }
@@ -71,7 +74,16 @@ public abstract class CommitTransactionPayload extends Payload implements Serial
     }
 
     public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate() throws IOException {
-        return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+        Entry<TransactionIdentifier, DataTreeCandidate> localCandidate = candidate;
+        if (localCandidate == null) {
+            synchronized (this) {
+                localCandidate = candidate;
+                if (localCandidate == null) {
+                    candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+                }
+            }
+        }
+        return localCandidate;
     }
 
     public final Entry<TransactionIdentifier, DataTreeCandidate> getCandidate(
@@ -81,6 +93,14 @@ public abstract class CommitTransactionPayload extends Payload implements Serial
                 DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
     }
 
+    public TransactionIdentifier getIdentifier() {
+        try  {
+            return getCandidate().getKey();
+        } catch (IOException e) {
+            throw new IllegalStateException("Candidate deserialization failed.", e);
+        }
+    }
+
     abstract void writeBytes(ObjectOutput out) throws IOException;
 
     abstract DataInput newDataInput();
index 270c99d86cb58396d36815748a1ff4fa96624aec..ac870905adcf1e781d49da672fb7561a22d6b2f9 100644 (file)
@@ -36,7 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class ShardInformation {
+@VisibleForTesting
+public final class ShardInformation {
     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
 
     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
@@ -89,7 +90,8 @@ final class ShardInformation {
         return shardName;
     }
 
-    @Nullable ActorRef getActor() {
+    @VisibleForTesting
+    @Nullable public ActorRef getActor() {
         return actor;
     }
 
index fee555470f7fefd74dcc136c7595f1f0023d776c..2cc30a271cec446c3a05bbb546792d65f9511b66 100644 (file)
@@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         configUpdateHandler.initListener(dataStore, datastoreType);
     }
 
-    private void onShutDown() {
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
index 919e4d82c7cac901b4697b11a428f278839dff10..9b790ce5c86a7555cafd8da547d857bb57bba4be 100644 (file)
@@ -12,13 +12,17 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.LocalShardStore;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
-public class TestClientBackedDataStore extends ClientBackedDataStore {
+public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore {
+
     public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
                                      final Configuration configuration,
                                      final DatastoreContextFactory datastoreContextFactory,
@@ -35,4 +39,10 @@ public class TestClientBackedDataStore extends ClientBackedDataStore {
     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
         return new TestShardManager.TestShardManagerCreator();
     }
+
+    @Override
+    public GetLocalShardsReply getLocalShards() {
+        final ActorUtils utils = getActorUtils();
+        return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE);
+    }
 }
index e403bbc20a2ccc5837121d6e502eaa72d2b53c2d..0c74c71d832988b5959f1119da2c5e9016b98985 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -46,7 +47,10 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import org.junit.After;
@@ -67,6 +71,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
+import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
+import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -84,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -1364,6 +1371,70 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
     }
 
+    @SuppressWarnings("IllegalCatch")
+    @Test
+    public void testRaftCallbackDuringLeadershipDrop() throws Exception {
+        final String testName = "testRaftCallbackDuringLeadershipDrop";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
+                        .shardLeaderElectionTimeoutInSeconds(3600),
+                commitTimeout);
+
+        final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        leaderTestKit.doCommit(initialWriteTx.ready());
+
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+                testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+
+            final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
+            member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
+
+            final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+            newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            final AtomicBoolean submitDone = new AtomicBoolean(false);
+            executor.submit(() -> {
+                try {
+                    leaderTestKit.doCommit(newTx.ready());
+                    submitDone.set(true);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            await().atMost(10, TimeUnit.SECONDS)
+                    .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                            .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
+
+            final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
+
+            // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
+            // new term(switching to candidate after election timeout)
+            leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
+                    "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
+                            -1), member3Cars);
+
+            member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
+            member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
+
+            await("Is tx stuck in COMMIT_PENDING")
+                    .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
+
+        }
+
+        executor.shutdownNow();
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java
new file mode 100644 (file)
index 0000000..9d34902
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
+
+public interface LocalShardStore {
+
+    GetLocalShardsReply getLocalShards();
+}
index 36064145dd1805aad3a8772115dd621a4cacb80b..882d0e176b4efb822858fd0e3a679af90be2a32f 100644 (file)
@@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardM
 import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
-public class TestDistributedDataStore extends DistributedDataStore {
+public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore {
 
     public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
                                     final Configuration configuration,
@@ -32,4 +32,13 @@ public class TestDistributedDataStore extends DistributedDataStore {
     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
         return new TestShardManager.TestShardManagerCreator();
     }
+
+    @Override
+    public TestShardManager.GetLocalShardsReply getLocalShards() {
+        TestShardManager.GetLocalShardsReply reply =
+            (TestShardManager.GetLocalShardsReply) getActorUtils()
+                .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE);
+
+        return reply;
+    }
 }
index b0e744a24add221131082faba9e3144f8b35c497..9eb20c03960792898a611d89736a6d4a3b205031 100644 (file)
@@ -7,14 +7,51 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 
 public class TestShard extends Shard {
+    public static class Builder extends Shard.Builder {
+        Builder() {
+            super(TestShard.class);
+        }
+    }
+
     // Message to request FrontendMetadata
     public static final class RequestFrontendMetadata {
 
     }
 
+    private abstract static class DropMessages<T> {
+        private final Class<T> msgClass;
+
+        DropMessages(final Class<T> msgClass) {
+            this.msgClass = requireNonNull(msgClass);
+        }
+
+        final Class<T> getMsgClass() {
+            return msgClass;
+        }
+    }
+
+    public static class StartDropMessages<T> extends DropMessages<T> {
+        public StartDropMessages(final Class<T> msgClass) {
+            super(msgClass);
+        }
+    }
+
+    public static class StopDropMessages<T> extends DropMessages<T> {
+        public StopDropMessages(final Class<T> msgClass) {
+            super(msgClass);
+        }
+    }
+
+    private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
+
     protected TestShard(AbstractBuilder<?, ?> builder) {
         super(builder);
     }
@@ -29,14 +66,37 @@ public class TestShard extends Shard {
         }
     }
 
-    public static Shard.Builder builder() {
-        return new TestShard.Builder();
+    @Override
+    protected void handleCommand(Object message) {
+        if (message instanceof StartDropMessages) {
+            startDropMessages(((StartDropMessages<?>) message).getMsgClass());
+        } else if (message instanceof StopDropMessages) {
+            stopDropMessages(((StopDropMessages<?>) message).getMsgClass());
+        } else {
+            dropOrHandle(message);
+        }
     }
 
-    public static class Builder extends Shard.Builder {
-        Builder() {
-            super(TestShard.class);
+    private <T> void dropOrHandle(T message) {
+        Predicate<T> drop = (Predicate<T>) dropMessages.get(message.getClass());
+        if (drop == null || !drop.test(message)) {
+            super.handleCommand(message);
         }
     }
-}
 
+    private void startDropMessages(final Class<?> msgClass) {
+        dropMessages.put(msgClass, msg -> true);
+    }
+
+    <T> void startDropMessages(final Class<T> msgClass, final Predicate<T> filter) {
+        dropMessages.put(msgClass, filter);
+    }
+
+    public void stopDropMessages(final Class<?> msgClass) {
+        dropMessages.remove(msgClass);
+    }
+
+    public static TestShard.Builder builder() {
+        return new TestShard.Builder();
+    }
+}
index 0783c1646408ebcf1e36c0a252a661fe14192634..337c4c95b9eacaadf5629de076abb0c4077c5c60 100644 (file)
@@ -20,6 +20,15 @@ public class TestShardManager extends ShardManager {
         super(builder);
     }
 
+    @Override
+    public void handleCommand(Object message) throws Exception {
+        if (GetLocalShards.INSTANCE.equals(message)) {
+            sender().tell(new GetLocalShardsReply(localShards), null);
+        } else {
+            super.handleCommand(message);
+        }
+    }
+
     /**
      * Plug into shard actor creation to replace info with our testing one.
      * @param info shard info.
@@ -27,10 +36,12 @@ public class TestShardManager extends ShardManager {
      */
     @Override
     protected ActorRef newShardActor(ShardInformation info) {
+        Map<String, String> peerAddresses = getPeerAddresses(info.getShardName());
         ShardInformation newInfo = new ShardInformation(info.getShardName(),
-                info.getShardId(), getPeerAddresses(info.getShardName()),
+                info.getShardId(), peerAddresses,
                 info.getDatastoreContext(),
-                TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
+                TestShard.builder()
+                        .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
                 peerAddressResolver);
         newInfo.setSchemaContext(info.getSchemaContext());
         newInfo.setActiveMember(info.isActiveMember());
@@ -58,4 +69,25 @@ public class TestShardManager extends ShardManager {
             return Props.create(TestShardManager.class, this);
         }
     }
+
+    public static final class GetLocalShards {
+        public static final GetLocalShards INSTANCE = new GetLocalShards();
+
+        private GetLocalShards() {
+
+        }
+    }
+
+    public static class GetLocalShardsReply {
+
+        private final Map<String, ShardInformation> localShards;
+
+        public GetLocalShardsReply(Map<String, ShardInformation> localShards) {
+            this.localShards = localShards;
+        }
+
+        public Map<String, ShardInformation> getLocalShards() {
+            return localShards;
+        }
+    }
 }