Fixup "Leader should always apply modifications as local" regression 16/87616/4
authorTomas Cere <tomas.cere@pantheon.tech>
Wed, 12 Feb 2020 12:23:22 +0000 (13:23 +0100)
committerTomas Cere <tomas.cere@pantheon.tech>
Fri, 20 Mar 2020 11:38:07 +0000 (11:38 +0000)
This reintroduces the original commit:
 e66759266dc43d5f58b2837aca5047b42c205e4a
and fixes up the two bugs it introduced.

First one didn't account for the fact that the new leader might have not
tracked forwarded transactions from the previous leader which would
be ignored. Previously not tracked transactions are now applied
normally on the new leader.

Second issue was that we started keeping around deserialized
candidates in memory in each CommitTransactionPayload even after they
were applied and no longer needed.During the final use of the
candidate in ShardDataTree the candidate is now clearedduring
CommitTransactionPayload.acquireCandidate() which allows the
deserialized candidate to be cleared by GC.

JIRA: CONTROLLER-1927
JIRA: CONTROLLER-1928
Change-Id: I47876d4d29a8e9b6b9283d70e47108f3dc2ed3ee
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java

index 32e2e09..6560ad7 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -55,6 +56,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -433,8 +436,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
-    @Override
-    protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
+    /**
+     * Removes and returns the ClientRequestTracker for the specified log index.
+     * @param logIndex the log index
+     * @return the ClientRequestTracker or null if none available
+     */
+    private ClientRequestTracker removeClientRequestTracker(final long logIndex) {
         final Iterator<ClientRequestTracker> it = trackers.iterator();
         while (it.hasNext()) {
             final ClientRequestTracker t = it.next();
@@ -447,6 +454,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return null;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        // first check whether a ClientRequestTracker exists for this entry.
+        // If it does that means the leader wasn't dropped before the transaction applied.
+        // That means that this transaction can be safely applied as a local transaction since we
+        // have the ClientRequestTracker.
+        final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex());
+        if (tracker != null) {
+            return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry);
+        }
+
+        // Tracker is missing, this means that we switched behaviours between replicate and applystate
+        // and became the leader again,. We still want to apply this as a local modification because
+        // we have resumed leadership with that log entry having been committed.
+        final Payload payload = entry.getData();
+        if (payload instanceof IdentifiablePayload) {
+            return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+        }
+
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
         return this;
index 3440de9..fd2fbd3 100644 (file)
@@ -18,7 +18,6 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -324,15 +323,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return context.getReplicatedLog().lastIndex();
     }
 
-    /**
-     * Removes and returns the ClientRequestTracker for the specified log index.
-     * @param logIndex the log index
-     * @return the ClientRequestTracker or null if none available
-     */
-    protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
-        return null;
-    }
-
     /**
      * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
      *
@@ -397,13 +387,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
 
-                final ApplyState applyState;
-                final ClientRequestTracker tracker = removeClientRequestTracker(i);
-                if (tracker != null) {
-                    applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
-                } else {
-                    applyState = new ApplyState(null, null, replicatedLogEntry);
-                }
+                final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
 
                 log.debug("{}: Setting last applied to {}", logName(), i);
 
@@ -425,6 +409,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
     }
 
+    /**
+     * Create an ApplyState message for a particular log entry so we can determine how to apply this entry.
+     *
+     * @param entry the log entry
+     * @return ApplyState for this entry
+     */
+    abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry);
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof AppendEntries) {
index afa4689..a8762ec 100644 (file)
@@ -15,6 +15,8 @@ import java.util.Collection;
 import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -50,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     private final Collection<String> votingPeers = new ArrayList<>();
 
-    public Candidate(RaftActorContext context) {
+    public Candidate(final RaftActorContext context) {
         super(context, RaftState.Candidate);
 
         for (PeerInfo peer: context.getPeers()) {
@@ -83,7 +85,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
+    protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
 
         log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
@@ -99,12 +101,13 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+    protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+            final AppendEntriesReply appendEntriesReply) {
         return this;
     }
 
     @Override
-    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+    protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
         log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
@@ -129,8 +132,14 @@ public class Candidate extends AbstractRaftActorBehavior {
         return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
     }
 
+
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        throw new IllegalStateException("A candidate should never attempt to apply " + entry);
+    }
+
     @Override
-    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+    public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout) {
             log.debug("{}: Received ElectionTimeout", logName());
 
@@ -178,10 +187,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-
     private void startNewTerm() {
-
-
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
index d88c30d..b642ee4 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -435,6 +436,11 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java
new file mode 100644 (file)
index 0000000..a323fba
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+public abstract class IdentifiablePayload<T extends Identifier> extends Payload implements Identifiable<T> {
+}
index b970ba4..fc65743 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
index 5a59b0d..3665fad 100644 (file)
@@ -325,7 +325,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
         final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
         final PruningDataTreeModification mod = createPruningModification(unwrapped,
             NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
@@ -386,7 +386,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private void applyReplicatedCandidate(final CommitTransactionPayload payload)
             throws DataValidationFailedException, IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
         final TransactionIdentifier identifier = entry.getKey();
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -431,7 +431,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 applyReplicatedCandidate((CommitTransactionPayload) payload);
             } else {
                 verify(identifier instanceof TransactionIdentifier);
-                payloadReplicationComplete((TransactionIdentifier) identifier);
+                // if we did not track this transaction before, it means that it came from another leader and we are in
+                // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+                // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+                if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+                    applyReplicatedCandidate((CommitTransactionPayload) payload);
+                }
             }
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
@@ -480,22 +485,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private void payloadReplicationComplete(final TransactionIdentifier txId) {
+    private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
             allMetadataCommittedTransaction(txId);
-            return;
+            return false;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
             allMetadataCommittedTransaction(txId);
-            return;
+            return false;
         }
 
         finishCommit(current.cohort);
+        return true;
     }
 
     private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
index 4ca3509..32e155b 100644 (file)
@@ -18,6 +18,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -27,8 +28,8 @@ import org.opendaylight.yangtools.concepts.Identifier;
  *
  * @author Robert Varga
  */
-public abstract class AbstractIdentifiablePayload<T extends Identifier>
-        extends Payload implements Identifiable<T>, Serializable {
+public abstract class AbstractIdentifiablePayload<T extends Identifier> extends IdentifiablePayload<T>
+        implements Serializable {
     protected abstract static class AbstractProxy<T extends Identifier> implements Externalizable {
         private static final long serialVersionUID = 1L;
         private byte[] serialized;
index f4ac854..65a65ea 100644 (file)
@@ -29,7 +29,7 @@ import java.util.Map.Entry;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Variant;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -44,10 +44,13 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 @Beta
-public abstract class CommitTransactionPayload extends Payload implements Serializable {
+public abstract class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
+        implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
+    private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
+
     CommitTransactionPayload() {
 
     }
@@ -79,7 +82,16 @@ public abstract class CommitTransactionPayload extends Payload implements Serial
     }
 
     public @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
-        return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+        Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
+        if (localCandidate == null) {
+            synchronized (this) {
+                localCandidate = candidate;
+                if (localCandidate == null) {
+                    candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+                }
+            }
+        }
+        return localCandidate;
     }
 
     public final @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
@@ -89,6 +101,26 @@ public abstract class CommitTransactionPayload extends Payload implements Serial
                 DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
     }
 
+    @Override
+    public TransactionIdentifier getIdentifier() {
+        try  {
+            return getCandidate().getKey();
+        } catch (IOException e) {
+            throw new IllegalStateException("Candidate deserialization failed.", e);
+        }
+    }
+
+    /**
+     * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
+     * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
+     * this was the last time the candidate was needed ant it is safe to be cleared.
+     */
+    public Entry<TransactionIdentifier, DataTreeCandidateWithVersion> acquireCandidate() throws IOException {
+        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = getCandidate();
+        candidate = null;
+        return localCandidate;
+    }
+
     abstract void writeBytes(ObjectOutput out) throws IOException;
 
     abstract DataInput newDataInput();
index 270c99d..ac87090 100644 (file)
@@ -36,7 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class ShardInformation {
+@VisibleForTesting
+public final class ShardInformation {
     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
 
     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
@@ -89,7 +90,8 @@ final class ShardInformation {
         return shardName;
     }
 
-    @Nullable ActorRef getActor() {
+    @VisibleForTesting
+    @Nullable public ActorRef getActor() {
         return actor;
     }
 
index fee5554..2cc30a2 100644 (file)
@@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         configUpdateHandler.initListener(dataStore, datastoreType);
     }
 
-    private void onShutDown() {
+    void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
index 919e4d8..9b790ce 100644 (file)
@@ -12,13 +12,17 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.LocalShardStore;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
-public class TestClientBackedDataStore extends ClientBackedDataStore {
+public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore {
+
     public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
                                      final Configuration configuration,
                                      final DatastoreContextFactory datastoreContextFactory,
@@ -35,4 +39,10 @@ public class TestClientBackedDataStore extends ClientBackedDataStore {
     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
         return new TestShardManager.TestShardManagerCreator();
     }
+
+    @Override
+    public GetLocalShardsReply getLocalShards() {
+        final ActorUtils utils = getActorUtils();
+        return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE);
+    }
 }
index e403bbc..0c74c71 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -46,7 +47,10 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import org.junit.After;
@@ -67,6 +71,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
+import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
+import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -84,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -1364,6 +1371,70 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
     }
 
+    @SuppressWarnings("IllegalCatch")
+    @Test
+    public void testRaftCallbackDuringLeadershipDrop() throws Exception {
+        final String testName = "testRaftCallbackDuringLeadershipDrop";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
+                        .shardLeaderElectionTimeoutInSeconds(3600),
+                commitTimeout);
+
+        final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        leaderTestKit.doCommit(initialWriteTx.ready());
+
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+                testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+
+            final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
+            member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
+
+            final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+            newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            final AtomicBoolean submitDone = new AtomicBoolean(false);
+            executor.submit(() -> {
+                try {
+                    leaderTestKit.doCommit(newTx.ready());
+                    submitDone.set(true);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
+                    .getLocalShards().get("cars").getActor();
+            await().atMost(10, TimeUnit.SECONDS)
+                    .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                            .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
+
+            final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
+                    .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
+
+            // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
+            // new term(switching to candidate after election timeout)
+            leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
+                    "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
+                            -1), member3Cars);
+
+            member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
+            member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
+
+            await("Is tx stuck in COMMIT_PENDING")
+                    .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
+
+        }
+
+        executor.shutdownNow();
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java
new file mode 100644 (file)
index 0000000..9d34902
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
+
+public interface LocalShardStore {
+
+    GetLocalShardsReply getLocalShards();
+}
index 3606414..882d0e1 100644 (file)
@@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardM
 import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
-public class TestDistributedDataStore extends DistributedDataStore {
+public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore {
 
     public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
                                     final Configuration configuration,
@@ -32,4 +32,13 @@ public class TestDistributedDataStore extends DistributedDataStore {
     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
         return new TestShardManager.TestShardManagerCreator();
     }
+
+    @Override
+    public TestShardManager.GetLocalShardsReply getLocalShards() {
+        TestShardManager.GetLocalShardsReply reply =
+            (TestShardManager.GetLocalShardsReply) getActorUtils()
+                .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE);
+
+        return reply;
+    }
 }
index b0e744a..9eb20c0 100644 (file)
@@ -7,14 +7,51 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 
 public class TestShard extends Shard {
+    public static class Builder extends Shard.Builder {
+        Builder() {
+            super(TestShard.class);
+        }
+    }
+
     // Message to request FrontendMetadata
     public static final class RequestFrontendMetadata {
 
     }
 
+    private abstract static class DropMessages<T> {
+        private final Class<T> msgClass;
+
+        DropMessages(final Class<T> msgClass) {
+            this.msgClass = requireNonNull(msgClass);
+        }
+
+        final Class<T> getMsgClass() {
+            return msgClass;
+        }
+    }
+
+    public static class StartDropMessages<T> extends DropMessages<T> {
+        public StartDropMessages(final Class<T> msgClass) {
+            super(msgClass);
+        }
+    }
+
+    public static class StopDropMessages<T> extends DropMessages<T> {
+        public StopDropMessages(final Class<T> msgClass) {
+            super(msgClass);
+        }
+    }
+
+    private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
+
     protected TestShard(AbstractBuilder<?, ?> builder) {
         super(builder);
     }
@@ -29,14 +66,37 @@ public class TestShard extends Shard {
         }
     }
 
-    public static Shard.Builder builder() {
-        return new TestShard.Builder();
+    @Override
+    protected void handleCommand(Object message) {
+        if (message instanceof StartDropMessages) {
+            startDropMessages(((StartDropMessages<?>) message).getMsgClass());
+        } else if (message instanceof StopDropMessages) {
+            stopDropMessages(((StopDropMessages<?>) message).getMsgClass());
+        } else {
+            dropOrHandle(message);
+        }
     }
 
-    public static class Builder extends Shard.Builder {
-        Builder() {
-            super(TestShard.class);
+    private <T> void dropOrHandle(T message) {
+        Predicate<T> drop = (Predicate<T>) dropMessages.get(message.getClass());
+        if (drop == null || !drop.test(message)) {
+            super.handleCommand(message);
         }
     }
-}
 
+    private void startDropMessages(final Class<?> msgClass) {
+        dropMessages.put(msgClass, msg -> true);
+    }
+
+    <T> void startDropMessages(final Class<T> msgClass, final Predicate<T> filter) {
+        dropMessages.put(msgClass, filter);
+    }
+
+    public void stopDropMessages(final Class<?> msgClass) {
+        dropMessages.remove(msgClass);
+    }
+
+    public static TestShard.Builder builder() {
+        return new TestShard.Builder();
+    }
+}
index 0783c16..337c4c9 100644 (file)
@@ -20,6 +20,15 @@ public class TestShardManager extends ShardManager {
         super(builder);
     }
 
+    @Override
+    public void handleCommand(Object message) throws Exception {
+        if (GetLocalShards.INSTANCE.equals(message)) {
+            sender().tell(new GetLocalShardsReply(localShards), null);
+        } else {
+            super.handleCommand(message);
+        }
+    }
+
     /**
      * Plug into shard actor creation to replace info with our testing one.
      * @param info shard info.
@@ -27,10 +36,12 @@ public class TestShardManager extends ShardManager {
      */
     @Override
     protected ActorRef newShardActor(ShardInformation info) {
+        Map<String, String> peerAddresses = getPeerAddresses(info.getShardName());
         ShardInformation newInfo = new ShardInformation(info.getShardName(),
-                info.getShardId(), getPeerAddresses(info.getShardName()),
+                info.getShardId(), peerAddresses,
                 info.getDatastoreContext(),
-                TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
+                TestShard.builder()
+                        .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
                 peerAddressResolver);
         newInfo.setSchemaContext(info.getSchemaContext());
         newInfo.setActiveMember(info.isActiveMember());
@@ -58,4 +69,25 @@ public class TestShardManager extends ShardManager {
             return Props.create(TestShardManager.class, this);
         }
     }
+
+    public static final class GetLocalShards {
+        public static final GetLocalShards INSTANCE = new GetLocalShards();
+
+        private GetLocalShards() {
+
+        }
+    }
+
+    public static class GetLocalShardsReply {
+
+        private final Map<String, ShardInformation> localShards;
+
+        public GetLocalShardsReply(Map<String, ShardInformation> localShards) {
+            this.localShards = localShards;
+        }
+
+        public Map<String, ShardInformation> getLocalShards() {
+            return localShards;
+        }
+    }
 }