From e66759266dc43d5f58b2837aca5047b42c205e4a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 5 Feb 2020 15:13:46 +0100 Subject: [PATCH] Revert "Leader should always apply modifications as local" This reverts commit 9b319f491af1c65705b69e8a182aab5006a2f959, which broke both upgrade-recovery and pre-leader entry application. We will re-visit the patch and try again. JIRA: CONTROLLER-1927 JIRA: CONTROLLER-1928 Change-Id: I23e4264cf6a20e9d2369d44005c31db1ef7635c9 Signed-off-by: Robert Varga --- .../raft/behaviors/AbstractLeader.java | 33 +-------- .../behaviors/AbstractRaftActorBehavior.java | 26 ++++--- .../cluster/raft/behaviors/Candidate.java | 22 +++--- .../cluster/raft/behaviors/Follower.java | 6 -- .../client/messages/IdentifiablePayload.java | 15 ---- .../protobuff/client/messages/Payload.java | 2 +- .../AbstractIdentifiablePayload.java | 5 +- .../persisted/CommitTransactionPayload.java | 27 +------ .../shardmanager/ShardInformation.java | 6 +- .../datastore/shardmanager/ShardManager.java | 2 +- .../databroker/TestClientBackedDataStore.java | 12 +--- ...butedDataStoreRemotingIntegrationTest.java | 71 ------------------ .../cluster/datastore/LocalShardStore.java | 15 ---- .../datastore/TestDistributedDataStore.java | 11 +-- .../cluster/datastore/TestShard.java | 72 ++----------------- .../shardmanager/TestShardManager.java | 36 +--------- 16 files changed, 46 insertions(+), 315 deletions(-) delete mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 6560ad76c3..32e2e09c3f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -41,7 +41,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.VotingState; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -56,8 +55,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import scala.concurrent.duration.FiniteDuration; /** @@ -436,12 +433,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } - /** - * Removes and returns the ClientRequestTracker for the specified log index. - * @param logIndex the log index - * @return the ClientRequestTracker or null if none available - */ - private ClientRequestTracker removeClientRequestTracker(final long logIndex) { + @Override + protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); @@ -454,28 +447,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return null; } - @Override - final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { - // first check whether a ClientRequestTracker exists for this entry. - // If it does that means the leader wasn't dropped before the transaction applied. - // That means that this transaction can be safely applied as a local transaction since we - // have the ClientRequestTracker. - final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex()); - if (tracker != null) { - return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry); - } - - // Tracker is missing, this means that we switched behaviours between replicate and applystate - // and became the leader again,. We still want to apply this as a local modification because - // we have resumed leadership with that log entry having been committed. - final Payload payload = entry.getData(); - if (payload instanceof IdentifiablePayload) { - return new ApplyState(null, ((IdentifiablePayload) payload).getIdentifier(), entry); - } - - return new ApplyState(null, null, entry); - } - @Override protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) { return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index fd2fbd332c..3440de9dd9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -18,6 +18,7 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -323,6 +324,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } + /** + * Removes and returns the ClientRequestTracker for the specified log index. + * @param logIndex the log index + * @return the ClientRequestTracker or null if none available + */ + protected ClientRequestTracker removeClientRequestTracker(final long logIndex) { + return null; + } + /** * Returns the actual index of the entry in replicated log for the given index or -1 if not found. * @@ -387,7 +397,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState applyState = getApplyStateFor(replicatedLogEntry); + final ApplyState applyState; + final ClientRequestTracker tracker = removeClientRequestTracker(i); + if (tracker != null) { + applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); + } else { + applyState = new ApplyState(null, null, replicatedLogEntry); + } log.debug("{}: Setting last applied to {}", logName(), i); @@ -409,14 +425,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } - /** - * Create an ApplyState message for a particular log entry so we can determine how to apply this entry. - * - * @param entry the log entry - * @return ApplyState for this entry - */ - abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry); - @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof AppendEntries) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index a8762ec76e..afa46892be 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -15,8 +15,6 @@ import java.util.Collection; import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -52,7 +50,7 @@ public class Candidate extends AbstractRaftActorBehavior { private final Collection votingPeers = new ArrayList<>(); - public Candidate(final RaftActorContext context) { + public Candidate(RaftActorContext context) { super(context, RaftState.Candidate); for (PeerInfo peer: context.getPeers()) { @@ -85,7 +83,7 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { log.debug("{}: handleAppendEntries: {}", logName(), appendEntries); @@ -101,13 +99,12 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, - final AppendEntriesReply appendEntriesReply) { + protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { return this; } @Override - protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) { + protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount); if (requestVoteReply.isVoteGranted()) { @@ -132,14 +129,8 @@ public class Candidate extends AbstractRaftActorBehavior { return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor()); } - - @Override - final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { - throw new IllegalStateException("A candidate should never attempt to apply " + entry); - } - @Override - public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof ElectionTimeout) { log.debug("{}: Received ElectionTimeout", logName()); @@ -187,7 +178,10 @@ public class Candidate extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } + private void startNewTerm() { + + // set voteCount back to 1 (that is voting for self) voteCount = 1; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index b642ee43a5..d88c30db33 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -30,7 +30,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -436,11 +435,6 @@ public class Follower extends AbstractRaftActorBehavior { return this; } - @Override - final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) { - return new ApplyState(null, null, entry); - } - @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout || message instanceof TimeoutNow) { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java deleted file mode 100644 index a323fba817..0000000000 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft.protobuff.client.messages; - -import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.Identifier; - -public abstract class IdentifiablePayload extends Payload implements Identifiable { -} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java index fc65743e7b..b970ba4485 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java index 32e155bf03..4ca35098c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java @@ -18,7 +18,6 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.Identifier; @@ -28,8 +27,8 @@ import org.opendaylight.yangtools.concepts.Identifier; * * @author Robert Varga */ -public abstract class AbstractIdentifiablePayload extends IdentifiablePayload - implements Serializable { +public abstract class AbstractIdentifiablePayload + extends Payload implements Identifiable, Serializable { protected abstract static class AbstractProxy implements Externalizable { private static final long serialVersionUID = 1L; private byte[] serialized; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index af19d14c00..f4ac854a2c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -29,7 +29,7 @@ import java.util.Map.Entry; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Variant; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -44,13 +44,10 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public abstract class CommitTransactionPayload extends IdentifiablePayload - implements Serializable { +public abstract class CommitTransactionPayload extends Payload implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; - private volatile Entry candidate = null; - CommitTransactionPayload() { } @@ -82,16 +79,7 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload getCandidate() throws IOException { - Entry localCandidate = candidate; - if (localCandidate == null) { - synchronized (this) { - localCandidate = candidate; - if (localCandidate == null) { - candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); - } - } - } - return localCandidate; + return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); } public final @NonNull Entry getCandidate( @@ -101,15 +89,6 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload onShardInitializedSet = new HashSet<>(); @@ -90,8 +89,7 @@ public final class ShardInformation { return shardName; } - @VisibleForTesting - @Nullable public ActorRef getActor() { + @Nullable ActorRef getActor() { return actor; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 2cc30a271c..fee555470f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { configUpdateHandler.initListener(dataStore, datastoreType); } - void onShutDown() { + private void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java index 9b790ce5c8..919e4d82c7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java @@ -12,17 +12,13 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; -import org.opendaylight.controller.cluster.datastore.LocalShardStore; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator; import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; -import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards; -import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore { - +public class TestClientBackedDataStore extends ClientBackedDataStore { public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, final DatastoreContextFactory datastoreContextFactory, @@ -39,10 +35,4 @@ public class TestClientBackedDataStore extends ClientBackedDataStore implements protected AbstractShardManagerCreator getShardManagerCreator() { return new TestShardManager.TestShardManagerCreator(); } - - @Override - public GetLocalShardsReply getLocalShards() { - final ActorUtils utils = getActorUtils(); - return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0c74c71d83..e403bbc20a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; -import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -47,10 +46,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.junit.After; @@ -71,8 +67,6 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; -import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; -import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -90,7 +84,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; -import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -1371,70 +1364,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } - @SuppressWarnings("IllegalCatch") - @Test - public void testRaftCallbackDuringLeadershipDrop() throws Exception { - final String testName = "testRaftCallbackDuringLeadershipDrop"; - initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - - final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, - DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) - .shardLeaderElectionTimeoutInSeconds(3600), - commitTimeout); - - final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - leaderTestKit.doCommit(initialWriteTx.ready()); - - try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { - - final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() - .getLocalShards().get("cars").getActor(); - final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() - .getLocalShards().get("cars").getActor(); - member2Cars.tell(new StartDropMessages(AppendEntries.class), null); - member3Cars.tell(new StartDropMessages(AppendEntries.class), null); - - final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final AtomicBoolean submitDone = new AtomicBoolean(false); - executor.submit(() -> { - try { - leaderTestKit.doCommit(newTx.ready()); - submitDone.set(true); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() - .getLocalShards().get("cars").getActor(); - await().atMost(10, TimeUnit.SECONDS) - .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() - .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); - - final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() - .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); - - // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with - // new term(switching to candidate after election timeout) - leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, - "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, - -1), member3Cars); - - member2Cars.tell(new StopDropMessages(AppendEntries.class), null); - member3Cars.tell(new StopDropMessages(AppendEntries.class), null); - - await("Is tx stuck in COMMIT_PENDING") - .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); - - } - - executor.shutdownNow(); - } - private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java deleted file mode 100644 index 9d3490263c..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply; - -public interface LocalShardStore { - - GetLocalShardsReply getLocalShards(); -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java index 882d0e176b..36064145dd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java @@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardM import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore { +public class TestDistributedDataStore extends DistributedDataStore { public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, final Configuration configuration, @@ -32,13 +32,4 @@ public class TestDistributedDataStore extends DistributedDataStore implements Lo protected AbstractShardManagerCreator getShardManagerCreator() { return new TestShardManager.TestShardManagerCreator(); } - - @Override - public TestShardManager.GetLocalShardsReply getLocalShards() { - TestShardManager.GetLocalShardsReply reply = - (TestShardManager.GetLocalShardsReply) getActorUtils() - .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE); - - return reply; - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java index 9eb20c0396..b0e744a24a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java @@ -7,51 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; -import static java.util.Objects.requireNonNull; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; public class TestShard extends Shard { - public static class Builder extends Shard.Builder { - Builder() { - super(TestShard.class); - } - } - // Message to request FrontendMetadata public static final class RequestFrontendMetadata { } - private abstract static class DropMessages { - private final Class msgClass; - - DropMessages(final Class msgClass) { - this.msgClass = requireNonNull(msgClass); - } - - final Class getMsgClass() { - return msgClass; - } - } - - public static class StartDropMessages extends DropMessages { - public StartDropMessages(final Class msgClass) { - super(msgClass); - } - } - - public static class StopDropMessages extends DropMessages { - public StopDropMessages(final Class msgClass) { - super(msgClass); - } - } - - private final Map, Predicate> dropMessages = new ConcurrentHashMap<>(); - protected TestShard(AbstractBuilder builder) { super(builder); } @@ -66,37 +29,14 @@ public class TestShard extends Shard { } } - @Override - protected void handleCommand(Object message) { - if (message instanceof StartDropMessages) { - startDropMessages(((StartDropMessages) message).getMsgClass()); - } else if (message instanceof StopDropMessages) { - stopDropMessages(((StopDropMessages) message).getMsgClass()); - } else { - dropOrHandle(message); - } + public static Shard.Builder builder() { + return new TestShard.Builder(); } - private void dropOrHandle(T message) { - Predicate drop = (Predicate) dropMessages.get(message.getClass()); - if (drop == null || !drop.test(message)) { - super.handleCommand(message); + public static class Builder extends Shard.Builder { + Builder() { + super(TestShard.class); } } - - private void startDropMessages(final Class msgClass) { - dropMessages.put(msgClass, msg -> true); - } - - void startDropMessages(final Class msgClass, final Predicate filter) { - dropMessages.put(msgClass, filter); - } - - public void stopDropMessages(final Class msgClass) { - dropMessages.remove(msgClass); - } - - public static TestShard.Builder builder() { - return new TestShard.Builder(); - } } + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java index 337c4c95b9..0783c16464 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java @@ -20,15 +20,6 @@ public class TestShardManager extends ShardManager { super(builder); } - @Override - public void handleCommand(Object message) throws Exception { - if (GetLocalShards.INSTANCE.equals(message)) { - sender().tell(new GetLocalShardsReply(localShards), null); - } else { - super.handleCommand(message); - } - } - /** * Plug into shard actor creation to replace info with our testing one. * @param info shard info. @@ -36,12 +27,10 @@ public class TestShardManager extends ShardManager { */ @Override protected ActorRef newShardActor(ShardInformation info) { - Map peerAddresses = getPeerAddresses(info.getShardName()); ShardInformation newInfo = new ShardInformation(info.getShardName(), - info.getShardId(), peerAddresses, + info.getShardId(), getPeerAddresses(info.getShardName()), info.getDatastoreContext(), - TestShard.builder() - .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), + TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), peerAddressResolver); newInfo.setSchemaContext(info.getSchemaContext()); newInfo.setActiveMember(info.isActiveMember()); @@ -69,25 +58,4 @@ public class TestShardManager extends ShardManager { return Props.create(TestShardManager.class, this); } } - - public static final class GetLocalShards { - public static final GetLocalShards INSTANCE = new GetLocalShards(); - - private GetLocalShards() { - - } - } - - public static class GetLocalShardsReply { - - private final Map localShards; - - public GetLocalShardsReply(Map localShards) { - this.localShards = localShards; - } - - public Map getLocalShards() { - return localShards; - } - } } -- 2.36.6