Handle leader change on ForwardedReadyTransaction in Shard 71/31171/3
authorTom Pantelis <tpanteli@brocade.com>
Fri, 11 Dec 2015 00:41:00 +0000 (19:41 -0500)
committerTony Tkacik <ttkacik@cisco.com>
Tue, 15 Dec 2015 13:01:19 +0000 (13:01 +0000)
When the ShardWriteTransaction actor sends the ForwardedReadyTransaction
to the Shard, if the local shard is no longer the leader it still tries
to commit the transaction. Replication fails and it ends up timing out
but a side effect is that it persista a new log entry which isn't good.

Therefore I added logic similar as was done with ReadyLocalTransaction
to forward to the leader. For ReadyLocalTransaction, a special
serializer was added to marshal it over the wire as a BatchedModifications
message. Rather then creating another serializer, I converted the
ForwardedReadyTransaction instance to a ReadyLocalTransaction instance.

Change-Id: Ia44e86606846f2af9599ad222b7dcae1bd0cf804
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index 0bb886a12defb5a70df4f39b34866f4a2cc3890f..f5b9f4c08d6413d845344df44a49edcf9f1ffca9 100644 (file)
@@ -229,8 +229,7 @@ public class Shard extends RaftActor {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
-                commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
-                        getSender(), this);
+                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
             } else if (message instanceof ReadyLocalTransaction) {
                 handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
             } else if (message instanceof ReadyLocalTransaction) {
                 handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -411,7 +410,7 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(String errMessage, Object message) {
+    private void noLeaderError(String errMessage) {
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
         getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
         getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
@@ -453,7 +452,7 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
+                noLeaderError("Could not commit transaction " + batched.getTransactionID());
             }
         }
     }
             }
         }
     }
@@ -492,7 +491,27 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
+                noLeaderError("Could not commit transaction " + message.getTransactionID());
+            }
+        }
+    }
+
+    private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+        if (isLeader()) {
+            failIfIsolatedLeader(getSender());
+
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
+
+                ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
+                leader.forward(readyLocal, getContext());
+            } else {
+                noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID());
             }
         }
     }
             }
         }
     }
index 7e8d23c6a87c916954090486fe0ea305a7e89625..39fe452b256d46d29fcfa17bc3d2b765ec138463 100644 (file)
@@ -264,13 +264,17 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         }
     }
 
         }
     }
 
-    protected Object prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, String transactionID,
-            short version, boolean doCommitOnReady) {
+    static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
         ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
         doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
         doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
         ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
         doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
         doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
+        return mockParent;
+    }
+
+    protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort,
+            String transactionID, short version, boolean doCommitOnReady) {
         return new ForwardedReadyTransaction(transactionID, version,
         return new ForwardedReadyTransaction(transactionID, version,
-                new ReadWriteShardDataTreeTransaction(mockParent, transactionID,
+                new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID,
                         mock(DataTreeModification.class)), true, doCommitOnReady);
     }
 
                         mock(DataTreeModification.class)), true, doCommitOnReady);
     }
 
index 8100bdc6a2ec7331baa31f26a02682674a60e3ad..005e904e4c9401938fd334738b3fdfa97a478399 100644 (file)
@@ -16,10 +16,12 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.dispatch.Futures;
 import akka.pattern.AskTimeoutException;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import akka.pattern.AskTimeoutException;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
@@ -28,6 +30,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -40,7 +43,10 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
@@ -65,6 +71,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
@@ -525,16 +532,18 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         dataTree.setSchemaContext(SchemaContextHelper.full());
 
         TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         dataTree.setSchemaContext(SchemaContextHelper.full());
-        DataTreeModification modification = dataTree.takeSnapshot().newModification();
 
 
+        // Send a tx with immediate commit.
+
+        DataTreeModification modification = dataTree.takeSnapshot().newModification();
         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
 
         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
 
-        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
-        new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
+        modification.ready();
 
 
-        String transactionID = "tx-1";
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true);
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -542,10 +551,101 @@ public class DistributedDataStoreRemotingIntegrationTest {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
-        assertTrue("Expected response of type " + CommitTransactionReply.SERIALIZABLE_CLASS,
-                CommitTransactionReply.SERIALIZABLE_CLASS.equals(resp.getClass()));
+        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
+
+        // Send another tx without immediate commit.
+
+        modification = dataTree.takeSnapshot().newModification();
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
+        modification.ready();
+
+        readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
+
+        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        resp = followerTestKit.expectMsgClass(Object.class);
+        if(resp instanceof akka.actor.Status.Failure) {
+            throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+        }
+
+        assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
+
+        ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+                ((ReadyTransactionReply)resp).getCohortPath());
+
+        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+        cohort.canCommit().get(5, TimeUnit.SECONDS);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+    }
+
+    @Test
+    public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
+        initDatastores("testForwardedReadyTransactionForwardedToLeader");
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+
+        Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
+        assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
+
+        carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+        DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
 
 
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
+        // Send a tx with immediate commit.
+
+        DataTreeModification modification = dataTree.takeSnapshot().newModification();
+        new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
+        new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
+
+        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
+                DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
+                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true);
+
+        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        Object resp = followerTestKit.expectMsgClass(Object.class);
+        if(resp instanceof akka.actor.Status.Failure) {
+            throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+        }
+
+        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
+
+        // Send another tx without immediate commit.
+
+        modification = dataTree.takeSnapshot().newModification();
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
+
+        forwardedReady = new ForwardedReadyTransaction("tx-2",
+                DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
+                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false);
+
+        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        resp = followerTestKit.expectMsgClass(Object.class);
+        if(resp instanceof akka.actor.Status.Failure) {
+            throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+        }
+
+        assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
+
+        ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+                ((ReadyTransactionReply)resp).getCohortPath());
+
+        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+        cohort.canCommit().get(5, TimeUnit.SECONDS);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
     @Test(expected=NoShardLeaderException.class)
     }
 
     @Test(expected=NoShardLeaderException.class)
index ae607b9b3da214094d44833de1aea900ecb36d69..7f41bb227722926f3f507d1599ec83fe90ed76fc 100644 (file)
@@ -46,7 +46,7 @@ public class ShardTestKit extends JavaTestKit {
 
     }
 
 
     }
 
-    public String waitUntilLeader(ActorRef shard) {
+    public static String waitUntilLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
index fc1af285b5a7d2bf31be42391b4f542969a4b377..3700c57d303f8b4ed3c6a88753888a7a58eb08e3 100644 (file)
@@ -52,8 +52,10 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
     private ActorRef createShard(){
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
     private ActorRef createShard(){
-        return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+        ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
                 schemaContext(TestModel.createTestContext()).props());
                 schemaContext(TestModel.createTestContext()).props());
+        ShardTestKit.waitUntilLeader(shard);
+        return shard;
     }
 
     @Test(expected = ReadFailedException.class)
     }
 
     @Test(expected = ReadFailedException.class)
index 314f497a2377174f9857e003eaca4c588eebb609..9e97d28c2c96454eb409fd8439c79b0f18bb7054 100644 (file)
@@ -82,8 +82,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     private int txCounter = 0;
 
     private ActorRef createShard() {
     private int txCounter = 0;
 
     private ActorRef createShard() {
-        return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+        ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
                 schemaContext(TestModel.createTestContext()).props());
                 schemaContext(TestModel.createTestContext()).props());
+        ShardTestKit.waitUntilLeader(shard);
+        return shard;
     }
 
     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
     }
 
     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {