Transaction message retry when no shard leader present 83/31183/8
authorTom Pantelis <tpanteli@brocade.com>
Fri, 11 Dec 2015 08:27:28 +0000 (03:27 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Sat, 19 Dec 2015 18:23:36 +0000 (18:23 +0000)
Implemented retry of transaction ready messages,
ForwardedReadyTransaction, ReadyLocalTransaction, and
BatchedModifications, when there's no current shard leader.

A new class, ShardTransactionMessageRetrySupport, maintains a
list of messages to retry and handles the retry logic. If there
is no leader, the message is added to the list and a timer
(2 * election time out) is started. If a leader is elected, on leader
changed, the messages are retried. If no leader is elected in time,
the messages are removed and NoShardLeaderException is returned.

Change-Id: Iade3fd245982d75ee97acf0534e9224551d9e45d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf [new file with mode: 0644]

index f7b3461d333f9c157dbeb06208eed276080c297d..5b8a50e18896c0a3e10db64c10b1d6fdffdc41d8 100644 (file)
@@ -121,7 +121,7 @@ public class Shard extends RaftActor {
 
     private ShardSnapshot restoreFromSnapshot;
 
 
     private ShardSnapshot restoreFromSnapshot;
 
-
+    private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
 
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -163,8 +163,7 @@ public class Shard extends RaftActor {
 
         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
 
 
         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
 
-
-
+        messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
     }
 
     private void setTransactionCommitTimeout() {
     }
 
     private void setTransactionCommitTimeout() {
@@ -184,6 +183,8 @@ public class Shard extends RaftActor {
 
         super.postStop();
 
 
         super.postStop();
 
+        messageRetrySupport.close();
+
         if(txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
         if(txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
@@ -265,6 +266,8 @@ public class Shard extends RaftActor {
                 sender().tell(store.getDataTree(), self());
             } else if(message instanceof ServerRemoved){
                 context().parent().forward(message, context());
                 sender().tell(store.getDataTree(), self());
             } else if(message instanceof ServerRemoved){
                 context().parent().forward(message, context());
+            } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+                messageRetrySupport.onTimerMessage(message);
             } else {
                 super.onReceiveCommand(message);
             }
             } else {
                 super.onReceiveCommand(message);
             }
@@ -410,12 +413,6 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(String errMessage) {
-        // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
-        // it more resilient in case we're in the process of electing a new leader.
-        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
-    }
-
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
@@ -452,7 +449,8 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + batched.getTransactionID());
+                messageRetrySupport.addMessageToRetry(batched, getSender(),
+                        "Could not commit transaction " + batched.getTransactionID());
             }
         }
     }
             }
         }
     }
@@ -491,7 +489,8 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + message.getTransactionID());
+                messageRetrySupport.addMessageToRetry(message, getSender(),
+                        "Could not commit transaction " + message.getTransactionID());
             }
         }
     }
             }
         }
     }
@@ -511,7 +510,8 @@ public class Shard extends RaftActor {
                 readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(readyLocal, getContext());
             } else {
                 readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(readyLocal, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID());
+                messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
+                        "Could not commit transaction " + forwardedReady.getTransactionID());
             }
         }
     }
             }
         }
     }
@@ -711,6 +711,10 @@ public class Shard extends RaftActor {
     @Override
     protected void onLeaderChanged(String oldLeader, String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
     @Override
     protected void onLeaderChanged(String oldLeader, String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
+
+        if(hasLeader()) {
+            messageRetrySupport.retryMessages();
+        }
     }
 
     @Override
     }
 
     @Override
index f8b8b9c95252834b2f0661a2658005124c53ef0f..887f656205182564f5e28a7027ce8a4b337c23e5 100644 (file)
@@ -7,9 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import akka.actor.UntypedActorContext;
 import akka.actor.ActorRef;
 import akka.actor.UntypedActorContext;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 
@@ -43,10 +43,15 @@ class ShardTransactionActorFactory {
         switch (type) {
         case READ_ONLY:
             transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID);
         switch (type) {
         case READ_ONLY:
             transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID);
+            shardMBean.incrementReadOnlyTransactionCount();
             break;
         case READ_WRITE:
             break;
         case READ_WRITE:
+            transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID);
+            shardMBean.incrementReadWriteTransactionCount();
+            break;
         case WRITE_ONLY:
             transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID);
         case WRITE_ONLY:
             transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID);
+            shardMBean.incrementWriteOnlyTransactionCount();
             break;
         default:
             throw new IllegalArgumentException("Unsupported transaction type " + type);
             break;
         default:
             throw new IllegalArgumentException("Unsupported transaction type " + type);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java
new file mode 100644 (file)
index 0000000..291867c
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Status.Failure;
+import java.io.Closeable;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardTransactionMessageRetrySupport implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
+
+    static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
+
+    private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
+    private final Shard shard;
+
+    ShardTransactionMessageRetrySupport(Shard shard) {
+        this.shard = shard;
+    }
+
+    void addMessageToRetry(Object message, ActorRef replyTo, String failureMessage) {
+        LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
+
+        MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
+
+        FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+        messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
+                messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
+
+        messagesToRetry.add(messageInfo);
+    }
+
+    void retryMessages() {
+        if(messagesToRetry.isEmpty()) {
+            return;
+        }
+
+        MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
+        messagesToRetry.clear();
+
+        for(MessageInfo info: copy) {
+            LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
+            info.retry(shard);
+        }
+    }
+
+    void onTimerMessage(Object message) {
+        MessageInfo messageInfo = (MessageInfo)message;
+
+        LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
+
+        messagesToRetry.remove(messageInfo);
+        messageInfo.timedOut(shard);
+    }
+
+    @Override
+    public void close() {
+        for(MessageInfo info: messagesToRetry) {
+            info.timedOut(shard);
+        }
+
+        messagesToRetry.clear();
+    }
+
+    private static class MessageInfo {
+        final Object message;
+        final ActorRef replyTo;
+        final String failureMessage;
+        Cancellable timer;
+
+        MessageInfo(Object message, ActorRef replyTo, String failureMessage) {
+            this.message = message;
+            this.replyTo = replyTo;
+            this.failureMessage = failureMessage;
+        }
+
+        void retry(Shard shard) {
+            timer.cancel();
+            shard.getSelf().tell(message, replyTo);
+        }
+
+        void timedOut(Shard shard) {
+            replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())), shard.getSelf());
+        }
+    }
+}
index e30d2055c904a63b4d9410f1f3da91d0cd265500..b995192113e1ad088b88c0ed7b2925d58abba42e 100644 (file)
@@ -51,4 +51,10 @@ public class ForwardedReadyTransaction {
     public boolean isDoImmediateCommit() {
         return doImmediateCommit;
     }
     public boolean isDoImmediateCommit() {
         return doImmediateCommit;
     }
+
+    @Override
+    public String toString() {
+        return "ForwardedReadyTransaction [transactionID=" + transactionID + ", doImmediateCommit=" + doImmediateCommit
+                + ", txnClientVersion=" + txnClientVersion + "]";
+    }
 }
 }
index 005e904e4c9401938fd334738b3fdfa97a478399..3c4c9046c82c4d48e0504cfe4677231e2ef50c25 100644 (file)
@@ -40,8 +40,11 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
+import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
@@ -50,6 +53,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
@@ -87,23 +92,26 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFac
  */
 public class DistributedDataStoreRemotingIntegrationTest {
 
  */
 public class DistributedDataStoreRemotingIntegrationTest {
 
-    private static final String[] SHARD_NAMES = {"cars", "people"};
+    private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
+    private static final String[] CARS = {"cars"};
 
     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
 
 
     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
 
-    private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf";
-    private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf";
+    private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
+    private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
+    private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
 
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
     private ActorSystem follower2System;
 
     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
 
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
     private ActorSystem follower2System;
 
     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
 
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
 
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
+                customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
@@ -129,19 +137,23 @@ public class DistributedDataStoreRemotingIntegrationTest {
         JavaTestKit.shutdownActorSystem(follower2System);
     }
 
         JavaTestKit.shutdownActorSystem(follower2System);
     }
 
-    private void initDatastores(String type) {
-        initDatastores(type, MODULE_SHARDS_CONFIG_2);
+    private void initDatastoresWithCars(String type) {
+        initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
     }
 
     }
 
-    private void initDatastores(String type, String moduleShardsConfig) {
+    private void initDatastoresWithCarsAndPeople(String type) {
+        initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
+    }
+
+    private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
-        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
+        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
 
         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
 
         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
+        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
 
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
     }
 
     private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
     }
 
     private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
@@ -171,7 +183,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
     @Test
     public void testWriteTransactionWithSingleShard() throws Exception {
         String testName = "testWriteTransactionWithSingleShard";
     @Test
     public void testWriteTransactionWithSingleShard() throws Exception {
         String testName = "testWriteTransactionWithSingleShard";
-        initDatastores(testName);
+        initDatastoresWithCars(testName);
 
         String followerCarShardName = "member-2-shard-cars-" + testName;
         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
 
         String followerCarShardName = "member-2-shard-cars-" + testName;
         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
@@ -220,7 +232,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
 
         DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
 
         DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
-                setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
+                setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE);
 
         verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
 
 
         verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
 
@@ -229,7 +241,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
 
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
-        initDatastores("testReadWriteTransactionWithSingleShard");
+        initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
         assertNotNull("newReadWriteTransaction returned null", rwTx);
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
         assertNotNull("newReadWriteTransaction returned null", rwTx);
@@ -255,7 +267,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testWriteTransactionWithMultipleShards() throws Exception {
 
     @Test
     public void testWriteTransactionWithMultipleShards() throws Exception {
-        initDatastores("testWriteTransactionWithMultipleShards");
+        initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -278,7 +290,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testReadWriteTransactionWithMultipleShards() throws Exception {
 
     @Test
     public void testReadWriteTransactionWithMultipleShards() throws Exception {
-        initDatastores("testReadWriteTransactionWithMultipleShards");
+        initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
         assertNotNull("newReadWriteTransaction returned null", rwTx);
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
         assertNotNull("newReadWriteTransaction returned null", rwTx);
@@ -301,7 +313,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testTransactionChainWithSingleShard() throws Exception {
 
     @Test
     public void testTransactionChainWithSingleShard() throws Exception {
-        initDatastores("testTransactionChainWithSingleShard");
+        initDatastoresWithCars("testTransactionChainWithSingleShard");
 
         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
 
 
         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
 
@@ -348,7 +360,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testTransactionChainWithMultipleShards() throws Exception{
 
     @Test
     public void testTransactionChainWithMultipleShards() throws Exception{
-        initDatastores("testTransactionChainWithMultipleShards");
+        initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
 
         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
 
 
         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
 
@@ -403,7 +415,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
 
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
-        initDatastores("testChainedTransactionFailureWithSingleShard");
+        initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
 
         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
 
         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
@@ -436,7 +448,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
 
     @Test
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
-        initDatastores("testChainedTransactionFailureWithMultipleShards");
+        initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
 
         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
 
         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
@@ -474,7 +486,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
     @Test
     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
         String testName = "testSingleShardTransactionsWithLeaderChanges";
     @Test
     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
         String testName = "testSingleShardTransactionsWithLeaderChanges";
-        initDatastores(testName);
+        initDatastoresWithCars(testName);
 
         String followerCarShardName = "member-2-shard-cars-" + testName;
         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
 
         String followerCarShardName = "member-2-shard-cars-" + testName;
         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
@@ -492,12 +504,12 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         // Switch the leader to the follower
 
 
         // Switch the leader to the follower
 
-        followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
-        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
+                shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
-        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+        followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
 
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
 
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
@@ -505,9 +517,9 @@ public class DistributedDataStoreRemotingIntegrationTest {
         DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
         DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
-        newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES);
+        newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
 
 
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
 
         // Write a car entry to the new leader - should switch to local Tx
 
 
         // Write a car entry to the new leader - should switch to local Tx
 
@@ -524,7 +536,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
 
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
-        initDatastores("testReadyLocalTransactionForwardedToLeader");
+        initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
 
         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
 
         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
@@ -586,7 +598,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
 
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
-        initDatastores("testForwardedReadyTransactionForwardedToLeader");
+        initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
 
         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
 
         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
@@ -648,16 +660,80 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
+    @Test
+    public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
+        initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
+
+        // Do an initial write to get the primary shard info cached.
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        // Wait for the commit to be replicated to the follower.
+
+        MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertEquals("getLastApplied", 0, raftState.getLastApplied());
+            }
+        });
+
+        // Create and prepare wo and rw tx's.
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        writeTx.write(CarsModel.newCarPath("optima"), car1);
+
+        DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        readWriteTx.write(CarsModel.newCarPath("sportage"), car2);
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
+            }
+        });
+
+        // Disable elections on the leader so it switches to follower.
+
+        sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+                customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+                shardElectionTimeoutFactor(10));
+
+        leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
+
+        // Submit tx's and enable elections on the follower so it becomes the leader, at which point the
+        // readied tx's should get forwarded from the previous leader.
+
+        DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+        DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
+                customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
+
+        followerTestKit.doCommit(cohort1);
+        followerTestKit.doCommit(cohort2);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+    }
+
     @Test(expected=NoShardLeaderException.class)
     public void testTransactionWithIsolatedLeader() throws Throwable {
     @Test(expected=NoShardLeaderException.class)
     public void testTransactionWithIsolatedLeader() throws Throwable {
-        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300);
+        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
         String testName = "testTransactionWithIsolatedLeader";
         String testName = "testTransactionWithIsolatedLeader";
-        initDatastores(testName);
+        initDatastoresWithCars(testName);
 
         JavaTestKit.shutdownActorSystem(followerSystem, null, true);
 
 
         JavaTestKit.shutdownActorSystem(followerSystem, null, true);
 
-        Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext()
-                .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS);
+        MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState());
+            }
+        });
 
         DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
         DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
@@ -671,8 +747,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test(expected=AskTimeoutException.class)
     public void testTransactionWithShardLeaderNotResponding() throws Throwable {
 
     @Test(expected=AskTimeoutException.class)
     public void testTransactionWithShardLeaderNotResponding() throws Throwable {
-        followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
-        initDatastores("testTransactionWithShardLeaderNotResponding");
+        initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
 
         // Do an initial read to get the primary shard info cached.
 
 
         // Do an initial read to get the primary shard info cached.
 
@@ -702,7 +777,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test(expected=NoShardLeaderException.class)
     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
 
     @Test(expected=NoShardLeaderException.class)
     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
-        initDatastores("testTransactionWithCreateTxFailureDueToNoLeader");
+        initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
 
         // Do an initial read to get the primary shard info cached.
 
 
         // Do an initial read to get the primary shard info cached.
 
@@ -715,8 +790,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
 
         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
-        followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
-        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
+                operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -731,14 +806,13 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
 
     @Test
     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
-        followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
-        initDatastores(testName, MODULE_SHARDS_CONFIG_3);
+        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
 
         DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
 
         DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
-        follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES);
+        follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS);
 
         // Do an initial read to get the primary shard info cached.
 
 
         // Do an initial read to get the primary shard info cached.
 
@@ -749,8 +823,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
-        followerDatastoreContextBuilder.operationTimeoutInMillis(500);
-        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
+        sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
+                operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
 
         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -760,11 +834,12 @@ public class DistributedDataStoreRemotingIntegrationTest {
     }
 
     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
     }
 
     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+        final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
             @Override
             public DatastoreContext answer(InvocationOnMock invocation) {
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
             @Override
             public DatastoreContext answer(InvocationOnMock invocation) {
-                return builder.build();
+                return newBuilder.build();
             }
         };
         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
             }
         };
         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
index ada0fba10eb5227f8bb2860ead76a600ccfaa7a4..c3d216be9ca9d144c51741194c06bb0550919549 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.Callable;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.Callable;
@@ -22,6 +23,7 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -32,6 +34,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class IntegrationTestKit extends ShardTestKit {
 
 
 public class IntegrationTestKit extends ShardTestKit {
 
@@ -92,7 +97,7 @@ public class IntegrationTestKit extends ShardTestKit {
         for(String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
 
         for(String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
 
-            assertNotNull("Shard was not created", shard);
+            assertNotNull("Shard was not created for " + shardName, shard);
 
             waitUntilLeader(shard);
         }
 
             waitUntilLeader(shard);
         }
@@ -101,13 +106,13 @@ public class IntegrationTestKit extends ShardTestKit {
     public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
         for(String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
     public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
         for(String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
-            assertNotNull("No local shard found", shard);
+            assertNotNull("No local shard found for " + shardName, shard);
 
             waitUntilNoLeader(shard);
         }
     }
 
 
             waitUntilNoLeader(shard);
         }
     }
 
-    private static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
+    public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
         ActorRef shard = null;
         for(int i = 0; i < 20 * 5 && shard == null; i++) {
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         ActorRef shard = null;
         for(int i = 0; i < 20 * 5 && shard == null; i++) {
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@@ -119,6 +124,31 @@ public class IntegrationTestKit extends ShardTestKit {
         return shard;
     }
 
         return shard;
     }
 
+    public static void verifyShardStats(DistributedDataStore datastore, String shardName, ShardStatsVerifier verifier)
+            throws Exception {
+        ActorContext actorContext = datastore.getActorContext();
+
+        Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
+        ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+
+        AssertionError lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            ShardStats shardStats = (ShardStats)actorContext.
+                    executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE);
+
+            try {
+                verifier.verify(shardStats);
+                return;
+            } catch (AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
+
     void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
             NormalizedNode<?, ?> nodeToWrite) throws Exception {
 
     void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
             NormalizedNode<?, ?> nodeToWrite) throws Exception {
 
@@ -204,4 +234,8 @@ public class IntegrationTestKit extends ShardTestKit {
             }
         }, expType);
     }
             }
         }, expType);
     }
+
+    public interface ShardStatsVerifier {
+        void verify(ShardStats stats);
+    }
 }
 }
index 25e37edf714cc8c6b2c6a6cf33ccc47b854c287f..ae751fa65d2b0a3a0917e672a1dfeab59f52a670 100644 (file)
@@ -49,6 +49,7 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -94,6 +95,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -1122,6 +1124,32 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
         }};
     }
 
+    @Test
+    public void testTransactionMessagesWithNoLeader() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+                shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionMessagesWithNoLeader");
+
+            waitUntilNoLeader(shard);
+
+            shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
+            Failure failure = expectMsgClass(Failure.class);
+            assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+            shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
+                    DataStoreVersions.CURRENT_VERSION, true), getRef());
+            failure = expectMsgClass(Failure.class);
+            assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+            shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
+            failure = expectMsgClass(Failure.class);
+            assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+        }};
+    }
+
     @Test
     public void testReadyWithImmediateCommit() throws Exception{
         testReadyWithImmediateCommit(true);
     @Test
     public void testReadyWithImmediateCommit() throws Exception{
         testReadyWithImmediateCommit(true);
index 7f41bb227722926f3f507d1599ec83fe90ed76fc..8f3231b4179ce2c3b086fe083c61fcf55f084574 100644 (file)
@@ -71,6 +71,7 @@ public class ShardTestKit extends JavaTestKit {
 
     public void waitUntilNoLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
 
     public void waitUntilNoLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+        Object lastResponse = null;
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
             try {
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
             try {
@@ -78,16 +79,25 @@ public class ShardTestKit extends JavaTestKit {
                 if(resp.getLeaderActor() == null) {
                     return;
                 }
                 if(resp.getLeaderActor() == null) {
                     return;
                 }
+
+                lastResponse = resp.getLeaderActor();
             } catch(TimeoutException e) {
             } catch(TimeoutException e) {
+                lastResponse = e;
             } catch(Exception e) {
                 System.err.println("FindLeader threw ex");
                 e.printStackTrace();
             } catch(Exception e) {
                 System.err.println("FindLeader threw ex");
                 e.printStackTrace();
+                lastResponse = e;
             }
 
             }
 
-
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         }
 
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         }
 
-        Assert.fail("Unexpected leader found for shard " + shard.path());
+        if(lastResponse instanceof Throwable) {
+            throw (AssertionError)new AssertionError(
+                    String.format("Unexpected error occurred from FindLeader for shard %s", shard.path())).
+                            initCause((Throwable)lastResponse);
+        }
+
+        Assert.fail(String.format("Unexpected leader %s found for shard %s", lastResponse, shard.path()));
     }
 }
\ No newline at end of file
     }
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf
new file mode 100644 (file)
index 0000000..c2090f4
--- /dev/null
@@ -0,0 +1,14 @@
+module-shards = [
+    {
+        name = "cars"
+        shards = [
+            {
+                name="cars"
+                replicas = [
+                    "member-1",
+                    "member-2"
+                ]
+            }
+        ]
+    }
+]
\ No newline at end of file