Handle leader change on ForwardedReadyTransaction in Shard 71/31171/3
authorTom Pantelis <tpanteli@brocade.com>
Fri, 11 Dec 2015 00:41:00 +0000 (19:41 -0500)
committerTony Tkacik <ttkacik@cisco.com>
Tue, 15 Dec 2015 13:01:19 +0000 (13:01 +0000)
When the ShardWriteTransaction actor sends the ForwardedReadyTransaction
to the Shard, if the local shard is no longer the leader it still tries
to commit the transaction. Replication fails and it ends up timing out
but a side effect is that it persista a new log entry which isn't good.

Therefore I added logic similar as was done with ReadyLocalTransaction
to forward to the leader. For ReadyLocalTransaction, a special
serializer was added to marshal it over the wire as a BatchedModifications
message. Rather then creating another serializer, I converted the
ForwardedReadyTransaction instance to a ReadyLocalTransaction instance.

Change-Id: Ia44e86606846f2af9599ad222b7dcae1bd0cf804
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index 0bb886a..f5b9f4c 100644 (file)
@@ -229,8 +229,7 @@ public class Shard extends RaftActor {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
-                commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
-                        getSender(), this);
+                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
             } else if (message instanceof ReadyLocalTransaction) {
                 handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -411,7 +410,7 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(String errMessage, Object message) {
+    private void noLeaderError(String errMessage) {
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
         getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
@@ -453,7 +452,7 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
+                noLeaderError("Could not commit transaction " + batched.getTransactionID());
             }
         }
     }
@@ -492,7 +491,27 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
+                noLeaderError("Could not commit transaction " + message.getTransactionID());
+            }
+        }
+    }
+
+    private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+        if (isLeader()) {
+            failIfIsolatedLeader(getSender());
+
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
+
+                ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
+                leader.forward(readyLocal, getContext());
+            } else {
+                noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID());
             }
         }
     }
index 7e8d23c..39fe452 100644 (file)
@@ -264,13 +264,17 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         }
     }
 
-    protected Object prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, String transactionID,
-            short version, boolean doCommitOnReady) {
+    static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
         ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
         doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
         doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
+        return mockParent;
+    }
+
+    protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort,
+            String transactionID, short version, boolean doCommitOnReady) {
         return new ForwardedReadyTransaction(transactionID, version,
-                new ReadWriteShardDataTreeTransaction(mockParent, transactionID,
+                new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID,
                         mock(DataTreeModification.class)), true, doCommitOnReady);
     }
 
index 8100bdc..005e904 100644 (file)
@@ -16,10 +16,12 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.dispatch.Futures;
 import akka.pattern.AskTimeoutException;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
@@ -28,6 +30,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -40,7 +43,10 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
@@ -65,6 +71,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
@@ -525,16 +532,18 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         dataTree.setSchemaContext(SchemaContextHelper.full());
-        DataTreeModification modification = dataTree.takeSnapshot().newModification();
 
+        // Send a tx with immediate commit.
+
+        DataTreeModification modification = dataTree.takeSnapshot().newModification();
         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
 
-        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
-        new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
+        modification.ready();
 
-        String transactionID = "tx-1";
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true);
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -542,10 +551,101 @@ public class DistributedDataStoreRemotingIntegrationTest {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
-        assertTrue("Expected response of type " + CommitTransactionReply.SERIALIZABLE_CLASS,
-                CommitTransactionReply.SERIALIZABLE_CLASS.equals(resp.getClass()));
+        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
+
+        // Send another tx without immediate commit.
+
+        modification = dataTree.takeSnapshot().newModification();
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
+        modification.ready();
+
+        readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
+
+        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        resp = followerTestKit.expectMsgClass(Object.class);
+        if(resp instanceof akka.actor.Status.Failure) {
+            throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+        }
+
+        assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
+
+        ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+                ((ReadyTransactionReply)resp).getCohortPath());
+
+        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+        cohort.canCommit().get(5, TimeUnit.SECONDS);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+    }
+
+    @Test
+    public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
+        initDatastores("testForwardedReadyTransactionForwardedToLeader");
+        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+
+        Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
+        assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
+
+        carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+        DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
 
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
+        // Send a tx with immediate commit.
+
+        DataTreeModification modification = dataTree.takeSnapshot().newModification();
+        new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
+        new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
+
+        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
+                DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
+                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true);
+
+        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        Object resp = followerTestKit.expectMsgClass(Object.class);
+        if(resp instanceof akka.actor.Status.Failure) {
+            throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+        }
+
+        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
+
+        // Send another tx without immediate commit.
+
+        modification = dataTree.takeSnapshot().newModification();
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+        new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
+
+        forwardedReady = new ForwardedReadyTransaction("tx-2",
+                DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
+                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false);
+
+        carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+        resp = followerTestKit.expectMsgClass(Object.class);
+        if(resp instanceof akka.actor.Status.Failure) {
+            throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
+        }
+
+        assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
+
+        ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+                ((ReadyTransactionReply)resp).getCohortPath());
+
+        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+        cohort.canCommit().get(5, TimeUnit.SECONDS);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
     @Test(expected=NoShardLeaderException.class)
index ae607b9..7f41bb2 100644 (file)
@@ -46,7 +46,7 @@ public class ShardTestKit extends JavaTestKit {
 
     }
 
-    public String waitUntilLeader(ActorRef shard) {
+    public static String waitUntilLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
index fc1af28..3700c57 100644 (file)
@@ -52,8 +52,10 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
     private ActorRef createShard(){
-        return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+        ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
                 schemaContext(TestModel.createTestContext()).props());
+        ShardTestKit.waitUntilLeader(shard);
+        return shard;
     }
 
     @Test(expected = ReadFailedException.class)
index 314f497..9e97d28 100644 (file)
@@ -82,8 +82,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     private int txCounter = 0;
 
     private ActorRef createShard() {
-        return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+        ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
                 schemaContext(TestModel.createTestContext()).props());
+        ShardTestKit.waitUntilLeader(shard);
+        return shard;
     }
 
     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {