Revert "Leader should always apply modifications as local" 52/87452/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 5 Feb 2020 14:13:46 +0000 (15:13 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 5 Feb 2020 14:14:37 +0000 (15:14 +0100)
This reverts commit 9b319f491af1c65705b69e8a182aab5006a2f959, which
broke both upgrade-recovery and pre-leader entry application. We
will re-visit the patch and try again.

JIRA: CONTROLLER-1927
JIRA: CONTROLLER-1928
Change-Id: I23e4264cf6a20e9d2369d44005c31db1ef7635c9
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java

index 6560ad7..32e2e09 100644 (file)
@@ -41,7 +41,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -56,8 +55,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -436,12 +433,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
-    /**
-     * Removes and returns the ClientRequestTracker for the specified log index.
-     * @param logIndex the log index
-     * @return the ClientRequestTracker or null if none available
-     */
-    private ClientRequestTracker removeClientRequestTracker(final long logIndex) {
+    @Override
+    protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
         final Iterator<ClientRequestTracker> it = trackers.iterator();
         while (it.hasNext()) {
             final ClientRequestTracker t = it.next();
@@ -454,28 +447,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override
-    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
-        // first check whether a ClientRequestTracker exists for this entry.
-        // If it does that means the leader wasn't dropped before the transaction applied.
-        // That means that this transaction can be safely applied as a local transaction since we
-        // have the ClientRequestTracker.
-        final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex());
-        if (tracker != null) {
-            return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry);
-        }
-
-        // Tracker is missing, this means that we switched behaviours between replicate and applystate
-        // and became the leader again,. We still want to apply this as a local modification because
-        // we have resumed leadership with that log entry having been committed.
-        final Payload payload = entry.getData();
-        if (payload instanceof IdentifiablePayload) {
-            return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
-        }
-
-        return new ApplyState(null, null, entry);
-    }
-
     @Override
     protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
         return this;
index fd2fbd3..3440de9 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -323,6 +324,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return context.getReplicatedLog().lastIndex();
     }
 
+    /**
+     * Removes and returns the ClientRequestTracker for the specified log index.
+     * @param logIndex the log index
+     * @return the ClientRequestTracker or null if none available
+     */
+    protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
+        return null;
+    }
+
     /**
      * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
      *
@@ -387,7 +397,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
 
-                final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
+                final ApplyState applyState;
+                final ClientRequestTracker tracker = removeClientRequestTracker(i);
+                if (tracker != null) {
+                    applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+                } else {
+                    applyState = new ApplyState(null, null, replicatedLogEntry);
+                }
 
                 log.debug("{}: Setting last applied to {}", logName(), i);
 
@@ -409,14 +425,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
     }
 
-    /**
-     * Create an ApplyState message for a particular log entry so we can determine how to apply this entry.
-     *
-     * @param entry the log entry
-     * @return ApplyState for this entry
-     */
-    abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry);
-
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof AppendEntries) {
index a8762ec..afa4689 100644 (file)
@@ -15,8 +15,6 @@ import java.util.Collection;
 import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -52,7 +50,7 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     private final Collection<String> votingPeers = new ArrayList<>();
 
-    public Candidate(final RaftActorContext context) {
+    public Candidate(RaftActorContext context) {
         super(context, RaftState.Candidate);
 
         for (PeerInfo peer: context.getPeers()) {
@@ -85,7 +83,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
+    protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
 
         log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
@@ -101,13 +99,12 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
-            final AppendEntriesReply appendEntriesReply) {
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
         return this;
     }
 
     @Override
-    protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
+    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
         log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
@@ -132,14 +129,8 @@ public class Candidate extends AbstractRaftActorBehavior {
         return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
     }
 
-
-    @Override
-    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
-        throw new IllegalStateException("A candidate should never attempt to apply " + entry);
-    }
-
     @Override
-    public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         if (message instanceof ElectionTimeout) {
             log.debug("{}: Received ElectionTimeout", logName());
 
@@ -187,7 +178,10 @@ public class Candidate extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
+
     private void startNewTerm() {
+
+
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
index b642ee4..d88c30d 100644 (file)
@@ -30,7 +30,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -436,11 +435,6 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
-    @Override
-    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
-        return new ApplyState(null, null, entry);
-    }
-
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/IdentifiablePayload.java
deleted file mode 100644 (file)
index a323fba..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-/*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
-
-import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.Identifier;
-
-public abstract class IdentifiablePayload<T extends Identifier> extends Payload implements Identifiable<T> {
-}
index fc65743..b970ba4 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
index 32e155b..4ca3509 100644 (file)
@@ -18,7 +18,6 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -28,8 +27,8 @@ import org.opendaylight.yangtools.concepts.Identifier;
  *
  * @author Robert Varga
  */
-public abstract class AbstractIdentifiablePayload<T extends Identifier> extends IdentifiablePayload<T>
-        implements Serializable {
+public abstract class AbstractIdentifiablePayload<T extends Identifier>
+        extends Payload implements Identifiable<T>, Serializable {
     protected abstract static class AbstractProxy<T extends Identifier> implements Externalizable {
         private static final long serialVersionUID = 1L;
         private byte[] serialized;
index af19d14..f4ac854 100644 (file)
@@ -29,7 +29,7 @@ import java.util.Map.Entry;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Variant;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -44,13 +44,10 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 @Beta
-public abstract class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
-        implements Serializable {
+public abstract class CommitTransactionPayload extends Payload implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
-    private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
-
     CommitTransactionPayload() {
 
     }
@@ -82,16 +79,7 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
     }
 
     public @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
-        Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
-        if (localCandidate == null) {
-            synchronized (this) {
-                localCandidate = candidate;
-                if (localCandidate == null) {
-                    candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
-                }
-            }
-        }
-        return localCandidate;
+        return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
     }
 
     public final @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
@@ -101,15 +89,6 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
                 DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
     }
 
-    @Override
-    public TransactionIdentifier getIdentifier() {
-        try  {
-            return getCandidate().getKey();
-        } catch (IOException e) {
-            throw new IllegalStateException("Candidate deserialization failed.", e);
-        }
-    }
-
     abstract void writeBytes(ObjectOutput out) throws IOException;
 
     abstract DataInput newDataInput();
index ac87090..270c99d 100644 (file)
@@ -36,8 +36,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@VisibleForTesting
-public final class ShardInformation {
+final class ShardInformation {
     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
 
     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
@@ -90,8 +89,7 @@ public final class ShardInformation {
         return shardName;
     }
 
-    @VisibleForTesting
-    @Nullable public ActorRef getActor() {
+    @Nullable ActorRef getActor() {
         return actor;
     }
 
index 2cc30a2..fee5554 100644 (file)
@@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         configUpdateHandler.initListener(dataStore, datastoreType);
     }
 
-    void onShutDown() {
+    private void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
             if (info.getActor() != null) {
index 9b790ce..919e4d8 100644 (file)
@@ -12,17 +12,13 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
-import org.opendaylight.controller.cluster.datastore.LocalShardStore;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
-import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards;
-import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
-public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore {
-
+public class TestClientBackedDataStore extends ClientBackedDataStore {
     public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
                                      final Configuration configuration,
                                      final DatastoreContextFactory datastoreContextFactory,
@@ -39,10 +35,4 @@ public class TestClientBackedDataStore extends ClientBackedDataStore implements
     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
         return new TestShardManager.TestShardManagerCreator();
     }
-
-    @Override
-    public GetLocalShardsReply getLocalShards() {
-        final ActorUtils utils = getActorUtils();
-        return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE);
-    }
 }
index 0c74c71..e403bbc 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -47,10 +46,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import org.junit.After;
@@ -71,8 +67,6 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
-import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
-import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -90,7 +84,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -1371,70 +1364,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
     }
 
-    @SuppressWarnings("IllegalCatch")
-    @Test
-    public void testRaftCallbackDuringLeadershipDrop() throws Exception {
-        final String testName = "testRaftCallbackDuringLeadershipDrop";
-        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
-
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-
-        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
-                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
-                        .shardLeaderElectionTimeoutInSeconds(3600),
-                commitTimeout);
-
-        final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
-        initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        leaderTestKit.doCommit(initialWriteTx.ready());
-
-        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
-                testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
-
-            final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
-                    .getLocalShards().get("cars").getActor();
-            final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
-                    .getLocalShards().get("cars").getActor();
-            member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
-            member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
-
-            final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
-            newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-            final AtomicBoolean submitDone = new AtomicBoolean(false);
-            executor.submit(() -> {
-                try {
-                    leaderTestKit.doCommit(newTx.ready());
-                    submitDone.set(true);
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            });
-            final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
-                    .getLocalShards().get("cars").getActor();
-            await().atMost(10, TimeUnit.SECONDS)
-                    .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
-                            .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
-
-            final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
-                    .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
-
-            // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
-            // new term(switching to candidate after election timeout)
-            leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
-                    "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
-                            -1), member3Cars);
-
-            member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
-            member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
-
-            await("Is tx stuck in COMMIT_PENDING")
-                    .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
-
-        }
-
-        executor.shutdownNow();
-    }
-
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalShardStore.java
deleted file mode 100644 (file)
index 9d34902..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-/*
- * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
-
-public interface LocalShardStore {
-
-    GetLocalShardsReply getLocalShards();
-}
index 882d0e1..3606414 100644 (file)
@@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardM
 import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
-public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore {
+public class TestDistributedDataStore extends DistributedDataStore {
 
     public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
                                     final Configuration configuration,
@@ -32,13 +32,4 @@ public class TestDistributedDataStore extends DistributedDataStore implements Lo
     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
         return new TestShardManager.TestShardManagerCreator();
     }
-
-    @Override
-    public TestShardManager.GetLocalShardsReply getLocalShards() {
-        TestShardManager.GetLocalShardsReply reply =
-            (TestShardManager.GetLocalShardsReply) getActorUtils()
-                .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE);
-
-        return reply;
-    }
 }
index 9eb20c0..b0e744a 100644 (file)
@@ -7,51 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static java.util.Objects.requireNonNull;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 
 public class TestShard extends Shard {
-    public static class Builder extends Shard.Builder {
-        Builder() {
-            super(TestShard.class);
-        }
-    }
-
     // Message to request FrontendMetadata
     public static final class RequestFrontendMetadata {
 
     }
 
-    private abstract static class DropMessages<T> {
-        private final Class<T> msgClass;
-
-        DropMessages(final Class<T> msgClass) {
-            this.msgClass = requireNonNull(msgClass);
-        }
-
-        final Class<T> getMsgClass() {
-            return msgClass;
-        }
-    }
-
-    public static class StartDropMessages<T> extends DropMessages<T> {
-        public StartDropMessages(final Class<T> msgClass) {
-            super(msgClass);
-        }
-    }
-
-    public static class StopDropMessages<T> extends DropMessages<T> {
-        public StopDropMessages(final Class<T> msgClass) {
-            super(msgClass);
-        }
-    }
-
-    private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
-
     protected TestShard(AbstractBuilder<?, ?> builder) {
         super(builder);
     }
@@ -66,37 +29,14 @@ public class TestShard extends Shard {
         }
     }
 
-    @Override
-    protected void handleCommand(Object message) {
-        if (message instanceof StartDropMessages) {
-            startDropMessages(((StartDropMessages<?>) message).getMsgClass());
-        } else if (message instanceof StopDropMessages) {
-            stopDropMessages(((StopDropMessages<?>) message).getMsgClass());
-        } else {
-            dropOrHandle(message);
-        }
+    public static Shard.Builder builder() {
+        return new TestShard.Builder();
     }
 
-    private <T> void dropOrHandle(T message) {
-        Predicate<T> drop = (Predicate<T>) dropMessages.get(message.getClass());
-        if (drop == null || !drop.test(message)) {
-            super.handleCommand(message);
+    public static class Builder extends Shard.Builder {
+        Builder() {
+            super(TestShard.class);
         }
     }
-
-    private void startDropMessages(final Class<?> msgClass) {
-        dropMessages.put(msgClass, msg -> true);
-    }
-
-    <T> void startDropMessages(final Class<T> msgClass, final Predicate<T> filter) {
-        dropMessages.put(msgClass, filter);
-    }
-
-    public void stopDropMessages(final Class<?> msgClass) {
-        dropMessages.remove(msgClass);
-    }
-
-    public static TestShard.Builder builder() {
-        return new TestShard.Builder();
-    }
 }
+
index 337c4c9..0783c16 100644 (file)
@@ -20,15 +20,6 @@ public class TestShardManager extends ShardManager {
         super(builder);
     }
 
-    @Override
-    public void handleCommand(Object message) throws Exception {
-        if (GetLocalShards.INSTANCE.equals(message)) {
-            sender().tell(new GetLocalShardsReply(localShards), null);
-        } else {
-            super.handleCommand(message);
-        }
-    }
-
     /**
      * Plug into shard actor creation to replace info with our testing one.
      * @param info shard info.
@@ -36,12 +27,10 @@ public class TestShardManager extends ShardManager {
      */
     @Override
     protected ActorRef newShardActor(ShardInformation info) {
-        Map<String, String> peerAddresses = getPeerAddresses(info.getShardName());
         ShardInformation newInfo = new ShardInformation(info.getShardName(),
-                info.getShardId(), peerAddresses,
+                info.getShardId(), getPeerAddresses(info.getShardName()),
                 info.getDatastoreContext(),
-                TestShard.builder()
-                        .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
+                TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
                 peerAddressResolver);
         newInfo.setSchemaContext(info.getSchemaContext());
         newInfo.setActiveMember(info.isActiveMember());
@@ -69,25 +58,4 @@ public class TestShardManager extends ShardManager {
             return Props.create(TestShardManager.class, this);
         }
     }
-
-    public static final class GetLocalShards {
-        public static final GetLocalShards INSTANCE = new GetLocalShards();
-
-        private GetLocalShards() {
-
-        }
-    }
-
-    public static class GetLocalShardsReply {
-
-        private final Map<String, ShardInformation> localShards;
-
-        public GetLocalShardsReply(Map<String, ShardInformation> localShards) {
-            this.localShards = localShards;
-        }
-
-        public Map<String, ShardInformation> getLocalShards() {
-            return localShards;
-        }
-    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.