From 78d3eea7d730f07f89c36fe24afbf51781a21bc3 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 12 Feb 2020 13:23:22 +0100 Subject: [PATCH] Fixup "Leader should always apply modifications as local" regression This reintroduces the original commit: e66759266dc43d5f58b2837aca5047b42c205e4a and fixes up the two bugs it introduced. First one didn't account for the fact that the new leader might have not tracked forwarded transactions from the previous leader which would be ignored. Previously not tracked transactions are now applied normally on the new leader. Second issue was that we started keeping around deserialized candidates in memory in each CommitTransactionPayload even after they were applied and no longer needed.During the final use of the candidate in ShardDataTree the candidate is now clearedduring CommitTransactionPayload.acquireCandidate() which allows the deserialized candidate to be cleared by GC. JIRA: CONTROLLER-1927 JIRA: CONTROLLER-1928 Change-Id: I47876d4d29a8e9b6b9283d70e47108f3dc2ed3ee Signed-off-by: Tomas Cere --- .../raft/behaviors/AbstractLeader.java | 33 ++++++++- .../behaviors/AbstractRaftActorBehavior.java | 26 +++---- .../cluster/raft/behaviors/Candidate.java | 22 +++--- .../cluster/raft/behaviors/Follower.java | 6 ++ .../client/messages/IdentifiablePayload.java | 15 ++++ .../protobuff/client/messages/Payload.java | 2 +- .../cluster/datastore/ShardDataTree.java | 18 +++-- .../AbstractIdentifiablePayload.java | 5 +- .../persisted/CommitTransactionPayload.java | 38 +++++++++- .../shardmanager/ShardInformation.java | 6 +- .../datastore/shardmanager/ShardManager.java | 2 +- .../databroker/TestClientBackedDataStore.java | 12 +++- ...butedDataStoreRemotingIntegrationTest.java | 71 ++++++++++++++++++ .../cluster/datastore/LocalShardStore.java | 15 ++++ .../datastore/TestDistributedDataStore.java | 11 ++- .../cluster/datastore/TestShard.java | 72 +++++++++++++++++-- .../shardmanager/TestShardManager.java | 36 +++++++++- 17 files changed, 338 insertions(+), 52 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 32e2e09c3f..6560ad76c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -55,6 +56,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import scala.concurrent.duration.FiniteDuration; /** @@ -433,8 +436,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } - @Override - protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { + /** + * Removes and returns the ClientRequestTracker for the specified log index. + * @param logIndex the log index + * @return the ClientRequestTracker or null if none available + */ + private ClientRequestTracker removeClientRequestTracker(final long logIndex) { final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); @@ -447,6 +454,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return null; } + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + // first check whether a ClientRequestTracker exists for this entry. + // If it does that means the leader wasn't dropped before the transaction applied. + // That means that this transaction can be safely applied as a local transaction since we + // have the ClientRequestTracker. + final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex()); + if (tracker != null) { + return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry); + } + + // Tracker is missing, this means that we switched behaviours between replicate and applystate + // and became the leader again,. We still want to apply this as a local modification because + // we have resumed leadership with that log entry having been committed. + final Payload payload = entry.getData(); + if (payload instanceof IdentifiablePayload) { + return new ApplyState(null, ((IdentifiablePayload) payload).getIdentifier(), entry); + } + + return new ApplyState(null, null, entry); + } + @Override protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) { return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 3440de9dd9..fd2fbd332c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -18,7 +18,6 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -324,15 +323,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } - /** - * Removes and returns the ClientRequestTracker for the specified log index. - * @param logIndex the log index - * @return the ClientRequestTracker or null if none available - */ - protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { - return null; - } - /** * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * @@ -397,13 +387,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState applyState; - final ClientRequestTracker tracker = removeClientRequestTracker(i); - if (tracker != null) { - applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); - } else { - applyState = new ApplyState(null, null, replicatedLogEntry); - } + final ApplyState applyState = getApplyStateFor(replicatedLogEntry); log.debug("{}: Setting last applied to {}", logName(), i); @@ -425,6 +409,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } + /** + * Create an ApplyState message for a particular log entry so we can determine how to apply this entry. + * + * @param entry the log entry + * @return ApplyState for this entry + */ + abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry); + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof AppendEntries) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index afa46892be..a8762ec76e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -15,6 +15,8 @@ import java.util.Collection; import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -50,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior { private final Collection votingPeers = new ArrayList<>(); - public Candidate(RaftActorContext context) { + public Candidate(final RaftActorContext context) { super(context, RaftState.Candidate); for (PeerInfo peer: context.getPeers()) { @@ -83,7 +85,7 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) { log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); @@ -99,12 +101,13 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, + final AppendEntriesReply appendEntriesReply) { return this; } @Override - protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { + protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) { log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount); if (requestVoteReply.isVoteGranted()) { @@ -129,8 +132,14 @@ public class Candidate extends AbstractRaftActorBehavior { return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor()); } + + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + throw new IllegalStateException("A candidate should never attempt to apply " + entry); + } + @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout) { log.debug("{}: Received ElectionTimeout", logName()); @@ -178,10 +187,7 @@ public class Candidate extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private void startNewTerm() { - - // set voteCount back to 1 (that is voting for self) voteCount = 1; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index d88c30db33..b642ee43a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -435,6 +436,11 @@ public class Follower extends AbstractRaftActorBehavior { return this; } + @Override + final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { + return new ApplyState(null, null, entry); + } + @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout || message instanceof TimeoutNow) { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java new file mode 100644 index 0000000000..a323fba817 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.protobuff.client.messages; + +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.Identifier; + +public abstract class IdentifiablePayload extends Payload implements Identifiable { +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java index b970ba4485..fc65743e7b 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 5a59b0d163..3665fad559 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -325,7 +325,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); final PruningDataTreeModification mod = createPruningModification(unwrapped, NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0); @@ -386,7 +386,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -431,7 +431,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } else { verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + // if we did not track this transaction before, it means that it came from another leader and we are in + // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to + // the local DataTree and would be lost if it was only applied via payloadReplicationComplete(). + if (!payloadReplicationComplete((TransactionIdentifier) identifier)) { + applyReplicatedCandidate((CommitTransactionPayload) payload); + } } } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { @@ -480,22 +485,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void payloadReplicationComplete(final TransactionIdentifier txId) { + private boolean payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); allMetadataCommittedTransaction(txId); - return; + return false; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); allMetadataCommittedTransaction(txId); - return; + return false; } finishCommit(current.cohort); + return true; } private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java index 4ca35098c5..32e155bf03 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java @@ -18,6 +18,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.Identifier; @@ -27,8 +28,8 @@ import org.opendaylight.yangtools.concepts.Identifier; * * @author Robert Varga */ -public abstract class AbstractIdentifiablePayload - extends Payload implements Identifiable, Serializable { +public abstract class AbstractIdentifiablePayload extends IdentifiablePayload + implements Serializable { protected abstract static class AbstractProxy implements Externalizable { private static final long serialVersionUID = 1L; private byte[] serialized; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index f4ac854a2c..65a65ea620 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -29,7 +29,7 @@ import java.util.Map.Entry; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; import org.opendaylight.yangtools.concepts.Variant; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -44,10 +44,13 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public abstract class CommitTransactionPayload extends Payload implements Serializable { +public abstract class CommitTransactionPayload extends IdentifiablePayload + implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; + private volatile Entry candidate = null; + CommitTransactionPayload() { } @@ -79,7 +82,16 @@ public abstract class CommitTransactionPayload extends Payload implements Serial } public @NonNull Entry getCandidate() throws IOException { - return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + Entry localCandidate = candidate; + if (localCandidate == null) { + synchronized (this) { + localCandidate = candidate; + if (localCandidate == null) { + candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + } + } + return localCandidate; } public final @NonNull Entry getCandidate( @@ -89,6 +101,26 @@ public abstract class CommitTransactionPayload extends Payload implements Serial DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); } + @Override + public TransactionIdentifier getIdentifier() { + try { + return getCandidate().getKey(); + } catch (IOException e) { + throw new IllegalStateException("Candidate deserialization failed.", e); + } + } + + /** + * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping + * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that + * this was the last time the candidate was needed ant it is safe to be cleared. + */ + public Entry acquireCandidate() throws IOException { + final Entry localCandidate = getCandidate(); + candidate = null; + return localCandidate; + } + abstract void writeBytes(ObjectOutput out) throws IOException; abstract DataInput newDataInput(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java index 270c99d86c..ac870905ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java @@ -36,7 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class ShardInformation { +@VisibleForTesting +public final class ShardInformation { private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class); private final Set onShardInitializedSet = new HashSet<>(); @@ -89,7 +90,8 @@ final class ShardInformation { return shardName; } - @Nullable ActorRef getActor() { + @VisibleForTesting + @Nullable public ActorRef getActor() { return actor; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index fee555470f..2cc30a271c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { configUpdateHandler.initListener(dataStore, datastoreType); } - private void onShutDown() { + void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java index 919e4d82c7..9b790ce5c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java @@ -12,13 +12,17 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.LocalShardStore; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator; import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards; +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -public class TestClientBackedDataStore extends ClientBackedDataStore { +public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore { + public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, @@ -35,4 +39,10 @@ public class TestClientBackedDataStore extends ClientBackedDataStore { protected AbstractShardManagerCreator getShardManagerCreator() { return new TestShardManager.TestShardManagerCreator(); } + + @Override + public GetLocalShardsReply getLocalShards() { + final ActorUtils utils = getActorUtils(); + return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e403bbc20a..0c74c71d83 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -46,7 +47,10 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.junit.After; @@ -67,6 +71,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -84,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -1364,6 +1371,70 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java new file mode 100644 index 0000000000..9d3490263c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply; + +public interface LocalShardStore { + + GetLocalShardsReply getLocalShards(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java index 36064145dd..882d0e176b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java @@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardM import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -public class TestDistributedDataStore extends DistributedDataStore { +public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore { public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, @@ -32,4 +32,13 @@ public class TestDistributedDataStore extends DistributedDataStore { protected AbstractShardManagerCreator getShardManagerCreator() { return new TestShardManager.TestShardManagerCreator(); } + + @Override + public TestShardManager.GetLocalShardsReply getLocalShards() { + TestShardManager.GetLocalShardsReply reply = + (TestShardManager.GetLocalShardsReply) getActorUtils() + .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE); + + return reply; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java index b0e744a24a..9eb20c0396 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java @@ -7,14 +7,51 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; public class TestShard extends Shard { + public static class Builder extends Shard.Builder { + Builder() { + super(TestShard.class); + } + } + // Message to request FrontendMetadata public static final class RequestFrontendMetadata { } + private abstract static class DropMessages { + private final Class msgClass; + + DropMessages(final Class msgClass) { + this.msgClass = requireNonNull(msgClass); + } + + final Class getMsgClass() { + return msgClass; + } + } + + public static class StartDropMessages extends DropMessages { + public StartDropMessages(final Class msgClass) { + super(msgClass); + } + } + + public static class StopDropMessages extends DropMessages { + public StopDropMessages(final Class msgClass) { + super(msgClass); + } + } + + private final Map, Predicate> dropMessages = new ConcurrentHashMap<>(); + protected TestShard(AbstractBuilder builder) { super(builder); } @@ -29,14 +66,37 @@ public class TestShard extends Shard { } } - public static Shard.Builder builder() { - return new TestShard.Builder(); + @Override + protected void handleCommand(Object message) { + if (message instanceof StartDropMessages) { + startDropMessages(((StartDropMessages) message).getMsgClass()); + } else if (message instanceof StopDropMessages) { + stopDropMessages(((StopDropMessages) message).getMsgClass()); + } else { + dropOrHandle(message); + } } - public static class Builder extends Shard.Builder { - Builder() { - super(TestShard.class); + private void dropOrHandle(T message) { + Predicate drop = (Predicate) dropMessages.get(message.getClass()); + if (drop == null || !drop.test(message)) { + super.handleCommand(message); } } -} + private void startDropMessages(final Class msgClass) { + dropMessages.put(msgClass, msg -> true); + } + + void startDropMessages(final Class msgClass, final Predicate filter) { + dropMessages.put(msgClass, filter); + } + + public void stopDropMessages(final Class msgClass) { + dropMessages.remove(msgClass); + } + + public static TestShard.Builder builder() { + return new TestShard.Builder(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java index 0783c16464..337c4c95b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java @@ -20,6 +20,15 @@ public class TestShardManager extends ShardManager { super(builder); } + @Override + public void handleCommand(Object message) throws Exception { + if (GetLocalShards.INSTANCE.equals(message)) { + sender().tell(new GetLocalShardsReply(localShards), null); + } else { + super.handleCommand(message); + } + } + /** * Plug into shard actor creation to replace info with our testing one. * @param info shard info. @@ -27,10 +36,12 @@ public class TestShardManager extends ShardManager { */ @Override protected ActorRef newShardActor(ShardInformation info) { + Map peerAddresses = getPeerAddresses(info.getShardName()); ShardInformation newInfo = new ShardInformation(info.getShardName(), - info.getShardId(), getPeerAddresses(info.getShardName()), + info.getShardId(), peerAddresses, info.getDatastoreContext(), - TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), + TestShard.builder() + .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), peerAddressResolver); newInfo.setSchemaContext(info.getSchemaContext()); newInfo.setActiveMember(info.isActiveMember()); @@ -58,4 +69,25 @@ public class TestShardManager extends ShardManager { return Props.create(TestShardManager.class, this); } } + + public static final class GetLocalShards { + public static final GetLocalShards INSTANCE = new GetLocalShards(); + + private GetLocalShards() { + + } + } + + public static class GetLocalShardsReply { + + private final Map localShards; + + public GetLocalShardsReply(Map localShards) { + this.localShards = localShards; + } + + public Map getLocalShards() { + return localShards; + } + } } -- 2.36.6