From: Tomas Cere Date: Tue, 18 Jun 2019 12:54:56 +0000 (+0200) Subject: Leader should always apply modifications as local X-Git-Tag: release/magnesium~14 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9b319f491af1c65705b69e8a182aab5006a2f959 Leader should always apply modifications as local Normally an entry application is as follows: 1. leader sends an append entry off to persistence and replicates it to followers 2. leaders creates its ClientRequestTracker 3. when the entry is done with persistence and replication leader moves its commit index 4. part of moving the commit index is sending an ApplyState message which finalizes the entry application in the DataTree 5. The ApplyState determines if a ClientRequestTracker is present and adds an identifier to the ApplyState message if it is. This determines the way in which the finalize of the entry application happens in the DataTree. If it is present the entry is applied as if it originated on the leader, if it is not present it is applied as if the node is a follower. The problem is when the leader flaps in a leader -> follower -> leader transition after 2. and before 4.. This would mean that the new leader no longer has the ClientRequestTracker which was created in the previous leader state, which means that when it starts with 5. it will create the ApplyState without an identifier and the entry finishes up the application as if the node is a follower. This means that it will be applied without finishCommit which means that the transaction will be forever stuck in COMMIT_PENDING state until the node would be restarted. Change this up, so that the leader will apply modifications as local, even when it looses its ClientRequestTracker and add Identifiable to payloads which require it. Since this code path should never occur when we are candidate, catch this transition. As ClientRequestTracker becomes an optional entity, we hide that as well. JIRA: CONTROLLER-1927 Change-Id: I636f998cd62ec82ef02193261624e4a51275fb86 Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 32e2e09c3f..6560ad76c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -55,6 +56,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import scala.concurrent.duration.FiniteDuration; /** @@ -433,8 +436,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } - @Override - protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { + /** + * Removes and returns the ClientRequestTracker for the specified log index. + * @param logIndex the log index + * @return the ClientRequestTracker or null if none available + */ + private ClientRequestTracker removeClientRequestTracker(final long logIndex) { final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); @@ -447,6 +454,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return null; } + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + // first check whether a ClientRequestTracker exists for this entry. + // If it does that means the leader wasn't dropped before the transaction applied. + // That means that this transaction can be safely applied as a local transaction since we + // have the ClientRequestTracker. + final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex()); + if (tracker != null) { + return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry); + } + + // Tracker is missing, this means that we switched behaviours between replicate and applystate + // and became the leader again,. We still want to apply this as a local modification because + // we have resumed leadership with that log entry having been committed. + final Payload payload = entry.getData(); + if (payload instanceof IdentifiablePayload) { + return new ApplyState(null, ((IdentifiablePayload) payload).getIdentifier(), entry); + } + + return new ApplyState(null, null, entry); + } + @Override protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) { return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 3440de9dd9..fd2fbd332c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -18,7 +18,6 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -324,15 +323,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } - /** - * Removes and returns the ClientRequestTracker for the specified log index. - * @param logIndex the log index - * @return the ClientRequestTracker or null if none available - */ - protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { - return null; - } - /** * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * @@ -397,13 +387,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState applyState; - final ClientRequestTracker tracker = removeClientRequestTracker(i); - if (tracker != null) { - applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); - } else { - applyState = new ApplyState(null, null, replicatedLogEntry); - } + final ApplyState applyState = getApplyStateFor(replicatedLogEntry); log.debug("{}: Setting last applied to {}", logName(), i); @@ -425,6 +409,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } + /** + * Create an ApplyState message for a particular log entry so we can determine how to apply this entry. + * + * @param entry the log entry + * @return ApplyState for this entry + */ + abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry); + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof AppendEntries) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index afa46892be..a8762ec76e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -15,6 +15,8 @@ import java.util.Collection; import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -50,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior { private final Collection votingPeers = new ArrayList<>(); - public Candidate(RaftActorContext context) { + public Candidate(final RaftActorContext context) { super(context, RaftState.Candidate); for (PeerInfo peer: context.getPeers()) { @@ -83,7 +85,7 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) { log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); @@ -99,12 +101,13 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, + final AppendEntriesReply appendEntriesReply) { return this; } @Override - protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { + protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) { log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount); if (requestVoteReply.isVoteGranted()) { @@ -129,8 +132,14 @@ public class Candidate extends AbstractRaftActorBehavior { return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor()); } + + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + throw new IllegalStateException("A candidate should never attempt to apply " + entry); + } + @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout) { log.debug("{}: Received ElectionTimeout", logName()); @@ -178,10 +187,7 @@ public class Candidate extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private void startNewTerm() { - - // set voteCount back to 1 (that is voting for self) voteCount = 1; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index d88c30db33..b642ee43a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -435,6 +436,11 @@ public class Follower extends AbstractRaftActorBehavior { return this; } + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + return new ApplyState(null, null, entry); + } + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout || message instanceof TimeoutNow) { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java new file mode 100644 index 0000000000..a323fba817 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.protobuff.client.messages; + +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.Identifier; + +public abstract class IdentifiablePayload extends Payload implements Identifiable { +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java index b970ba4485..fc65743e7b 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java index 4ca35098c5..32e155bf03 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java @@ -18,6 +18,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.Identifier; @@ -27,8 +28,8 @@ import org.opendaylight.yangtools.concepts.Identifier; * * @author Robert Varga */ -public abstract class AbstractIdentifiablePayload - extends Payload implements Identifiable, Serializable { +public abstract class AbstractIdentifiablePayload extends IdentifiablePayload + implements Serializable { protected abstract static class AbstractProxy implements Externalizable { private static final long serialVersionUID = 1L; private byte[] serialized; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index 73bdd6f31b..b2164133b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -27,7 +27,7 @@ import java.io.StreamCorruptedException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; import org.opendaylight.yangtools.concepts.Variant; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -42,10 +42,13 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public abstract class CommitTransactionPayload extends Payload implements Serializable { +public abstract class CommitTransactionPayload extends IdentifiablePayload + implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; + private volatile Entry candidate = null; + CommitTransactionPayload() { } @@ -71,7 +74,16 @@ public abstract class CommitTransactionPayload extends Payload implements Serial } public Entry getCandidate() throws IOException { - return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + Entry localCandidate = candidate; + if (localCandidate == null) { + synchronized (this) { + localCandidate = candidate; + if (localCandidate == null) { + candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + } + } + return localCandidate; } public final Entry getCandidate( @@ -81,6 +93,14 @@ public abstract class CommitTransactionPayload extends Payload implements Serial DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); } + public TransactionIdentifier getIdentifier() { + try { + return getCandidate().getKey(); + } catch (IOException e) { + throw new IllegalStateException("Candidate deserialization failed.", e); + } + } + abstract void writeBytes(ObjectOutput out) throws IOException; abstract DataInput newDataInput(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java index 270c99d86c..ac870905ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java @@ -36,7 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class ShardInformation { +@VisibleForTesting +public final class ShardInformation { private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class); private final Set onShardInitializedSet = new HashSet<>(); @@ -89,7 +90,8 @@ final class ShardInformation { return shardName; } - @Nullable ActorRef getActor() { + @VisibleForTesting + @Nullable public ActorRef getActor() { return actor; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index fee555470f..2cc30a271c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { configUpdateHandler.initListener(dataStore, datastoreType); } - private void onShutDown() { + void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java index 919e4d82c7..9b790ce5c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java @@ -12,13 +12,17 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.LocalShardStore; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator; import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards; +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -public class TestClientBackedDataStore extends ClientBackedDataStore { +public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore { + public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, @@ -35,4 +39,10 @@ public class TestClientBackedDataStore extends ClientBackedDataStore { protected AbstractShardManagerCreator getShardManagerCreator() { return new TestShardManager.TestShardManagerCreator(); } + + @Override + public GetLocalShardsReply getLocalShards() { + final ActorUtils utils = getActorUtils(); + return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e403bbc20a..0c74c71d83 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -46,7 +47,10 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.junit.After; @@ -67,6 +71,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -84,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -1364,6 +1371,70 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java new file mode 100644 index 0000000000..9d3490263c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply; + +public interface LocalShardStore { + + GetLocalShardsReply getLocalShards(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java index 36064145dd..882d0e176b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java @@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardM import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -public class TestDistributedDataStore extends DistributedDataStore { +public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore { public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, @@ -32,4 +32,13 @@ public class TestDistributedDataStore extends DistributedDataStore { protected AbstractShardManagerCreator getShardManagerCreator() { return new TestShardManager.TestShardManagerCreator(); } + + @Override + public TestShardManager.GetLocalShardsReply getLocalShards() { + TestShardManager.GetLocalShardsReply reply = + (TestShardManager.GetLocalShardsReply) getActorUtils() + .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE); + + return reply; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java index b0e744a24a..9eb20c0396 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java @@ -7,14 +7,51 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; public class TestShard extends Shard { + public static class Builder extends Shard.Builder { + Builder() { + super(TestShard.class); + } + } + // Message to request FrontendMetadata public static final class RequestFrontendMetadata { } + private abstract static class DropMessages { + private final Class msgClass; + + DropMessages(final Class msgClass) { + this.msgClass = requireNonNull(msgClass); + } + + final Class getMsgClass() { + return msgClass; + } + } + + public static class StartDropMessages extends DropMessages { + public StartDropMessages(final Class msgClass) { + super(msgClass); + } + } + + public static class StopDropMessages extends DropMessages { + public StopDropMessages(final Class msgClass) { + super(msgClass); + } + } + + private final Map, Predicate> dropMessages = new ConcurrentHashMap<>(); + protected TestShard(AbstractBuilder builder) { super(builder); } @@ -29,14 +66,37 @@ public class TestShard extends Shard { } } - public static Shard.Builder builder() { - return new TestShard.Builder(); + @Override + protected void handleCommand(Object message) { + if (message instanceof StartDropMessages) { + startDropMessages(((StartDropMessages) message).getMsgClass()); + } else if (message instanceof StopDropMessages) { + stopDropMessages(((StopDropMessages) message).getMsgClass()); + } else { + dropOrHandle(message); + } } - public static class Builder extends Shard.Builder { - Builder() { - super(TestShard.class); + private void dropOrHandle(T message) { + Predicate drop = (Predicate) dropMessages.get(message.getClass()); + if (drop == null || !drop.test(message)) { + super.handleCommand(message); } } -} + private void startDropMessages(final Class msgClass) { + dropMessages.put(msgClass, msg -> true); + } + + void startDropMessages(final Class msgClass, final Predicate filter) { + dropMessages.put(msgClass, filter); + } + + public void stopDropMessages(final Class msgClass) { + dropMessages.remove(msgClass); + } + + public static TestShard.Builder builder() { + return new TestShard.Builder(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java index 0783c16464..337c4c95b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java @@ -20,6 +20,15 @@ public class TestShardManager extends ShardManager { super(builder); } + @Override + public void handleCommand(Object message) throws Exception { + if (GetLocalShards.INSTANCE.equals(message)) { + sender().tell(new GetLocalShardsReply(localShards), null); + } else { + super.handleCommand(message); + } + } + /** * Plug into shard actor creation to replace info with our testing one. * @param info shard info. @@ -27,10 +36,12 @@ public class TestShardManager extends ShardManager { */ @Override protected ActorRef newShardActor(ShardInformation info) { + Map peerAddresses = getPeerAddresses(info.getShardName()); ShardInformation newInfo = new ShardInformation(info.getShardName(), - info.getShardId(), getPeerAddresses(info.getShardName()), + info.getShardId(), peerAddresses, info.getDatastoreContext(), - TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), + TestShard.builder() + .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), peerAddressResolver); newInfo.setSchemaContext(info.getSchemaContext()); newInfo.setActiveMember(info.isActiveMember()); @@ -58,4 +69,25 @@ public class TestShardManager extends ShardManager { return Props.create(TestShardManager.class, this); } } + + public static final class GetLocalShards { + public static final GetLocalShards INSTANCE = new GetLocalShards(); + + private GetLocalShards() { + + } + } + + public static class GetLocalShardsReply { + + private final Map localShards; + + public GetLocalShardsReply(Map localShards) { + this.localShards = localShards; + } + + public Map getLocalShards() { + return localShards; + } + } }