Fix forwarding in Shard.handleBatchedModifications 21/35121/6
authorTom Pantelis <tpanteli@brocade.com>
Fri, 19 Feb 2016 21:40:52 +0000 (16:40 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 18 Mar 2016 19:56:15 +0000 (19:56 +0000)
Addressed the TODO in Shard.handleBatchedModifications to handle the
case where the BatchedModifications is not the first batch when leadership
changes and the BatchedModifications needs to be forwarded to the new
leader. Added code in ShardCommitCoordinator to reconstruct all previous
BatchedModifications messages, if needed, from the transaction
DataTreeModification, honoring the max batched modification count.

Change-Id: I4fecb71be91aee24072086e1565437a69c20c8d9
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 57e85570a903fb7923dc80b1478b693071d8b848..138b71c10e111969c59777eedffce7c384f5afdd 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -427,11 +428,19 @@ public class Shard extends RaftActor {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
                         "Could not commit transaction " + batched.getTransactionID());
             } else {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
                         "Could not commit transaction " + batched.getTransactionID());
             } else {
-                // TODO: what if this is not the first batch and leadership changed in between batched messages?
-                // We could check if the commitCoordinator already has a cached entry and forward all the previous
-                // batched modifications.
-                LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
-                leader.forward(batched, getContext());
+                // If this is not the first batch and leadership changed in between batched messages,
+                // we need to reconstruct previous BatchedModifications from the transaction
+                // DataTreeModification, honoring the max batched modification count, and forward all the
+                // previous BatchedModifications to the new leader.
+                Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
+                        batched, datastoreContext.getShardBatchedModificationCount());
+
+                LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
+                        newModifications.size(), leader);
+
+                for(BatchedModifications bm: newModifications) {
+                    leader.forward(bm, getContext());
+                }
             }
         }
     }
             }
         }
     }
index 51d8d5caec18e0c94a22520fb23d1b9708acdbf4..76131e257b893a1671f74f01b1e508e2689ca731 100644 (file)
@@ -14,6 +14,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -30,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
@@ -247,6 +250,36 @@ class ShardCommitCoordinator {
         }
     }
 
         }
     }
 
+    Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+            final int maxModificationsPerBatch) {
+        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+        if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+            return Collections.singletonList(from);
+        }
+
+        cohortEntry.applyModifications(from.getModifications());
+
+        final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+        cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+            @Override
+            protected BatchedModifications getModifications() {
+                if(newModifications.isEmpty() ||
+                        newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+                    newModifications.add(new BatchedModifications(from.getTransactionID(),
+                            from.getVersion(), from.getTransactionChainID()));
+                }
+
+                return newModifications.getLast();
+            }
+        });
+
+        BatchedModifications last = newModifications.getLast();
+        last.setDoCommitOnReady(from.isDoCommitOnReady());
+        last.setReady(from.isReady());
+        last.setTotalMessagesSent(newModifications.size());
+        return newModifications;
+    }
+
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
@@ -621,6 +654,10 @@ class ShardCommitCoordinator {
             return cohort.getCandidate();
         }
 
             return cohort.getCandidate();
         }
 
+        ReadWriteShardDataTreeTransaction getTransaction() {
+            return transaction;
+        }
+
         int getTotalBatchedModificationsReceived() {
             return totalBatchedModificationsReceived;
         }
         int getTotalBatchedModificationsReceived() {
             return totalBatchedModificationsReceived;
         }
index fc5a99fe9adf88d05a386429345e30ead5944f33..a08a27fd04d495529754f964cf5e9a8b4447bf54 100644 (file)
@@ -8,19 +8,9 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import akka.serialization.JSerializer;
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import akka.serialization.JSerializer;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import javax.annotation.Nonnull;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.SerializationUtils;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 
 /**
  * Specialized message transformer, which transforms a {@link ReadyLocalTransaction}
 
 /**
  * Specialized message transformer, which transforms a {@link ReadyLocalTransaction}
@@ -59,70 +49,16 @@ public final class ReadyLocalTransactionSerializer extends JSerializer {
         return SerializationUtils.deserialize(bytes);
     }
 
         return SerializationUtils.deserialize(bytes);
     }
 
-    private static final class BatchedCursor implements DataTreeModificationCursor {
-        private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+    private static final class BatchedCursor extends AbstractBatchedModificationsCursor {
         private final BatchedModifications message;
 
         BatchedCursor(final BatchedModifications message) {
             this.message = Preconditions.checkNotNull(message);
         private final BatchedModifications message;
 
         BatchedCursor(final BatchedModifications message) {
             this.message = Preconditions.checkNotNull(message);
-            stack.push(YangInstanceIdentifier.EMPTY);
         }
 
         @Override
         }
 
         @Override
-        public void delete(final PathArgument child) {
-            message.addModification(new DeleteModification(stack.peek().node(child)));
-        }
-
-        @Override
-        public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
-            message.addModification(new MergeModification(stack.peek().node(child), data));
-        }
-
-        @Override
-        public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
-            message.addModification(new WriteModification(stack.peek().node(child), data));
-        }
-
-        @Override
-        public void enter(@Nonnull final PathArgument child) {
-            stack.push(stack.peek().node(child));
-        }
-
-        @Override
-        public void enter(@Nonnull final PathArgument... path) {
-            for (PathArgument arg : path) {
-                enter(arg);
-            }
-        }
-
-        @Override
-        public void enter(@Nonnull final Iterable<PathArgument> path) {
-            for (PathArgument arg : path) {
-                enter(arg);
-            }
-        }
-
-        @Override
-        public void exit() {
-            stack.pop();
-        }
-
-        @Override
-        public void exit(final int depth) {
-            Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
-            for (int i = 0; i < depth; ++i) {
-                stack.pop();
-            }
-        }
-
-        @Override
-        public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
-            throw new UnsupportedOperationException("Not implemented");
-        }
-
-        @Override
-        public void close() {
-            // No-op
+        protected BatchedModifications getModifications() {
+            return message;
         }
     }
 }
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java
new file mode 100644 (file)
index 0000000..02f8abb
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+/**
+ * Base class for a DataTreeModificationCursor that publishes to BatchedModifications instance(s).
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractBatchedModificationsCursor implements DataTreeModificationCursor {
+    private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+
+    protected AbstractBatchedModificationsCursor() {
+        stack.push(YangInstanceIdentifier.EMPTY);
+    }
+
+    protected abstract BatchedModifications getModifications();
+
+    @Override
+    public void delete(final PathArgument child) {
+        getModifications().addModification(new DeleteModification(stack.peek().node(child)));
+    }
+
+    @Override
+    public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+        getModifications().addModification(new MergeModification(stack.peek().node(child), data));
+    }
+
+    @Override
+    public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+        getModifications().addModification(new WriteModification(stack.peek().node(child), data));
+    }
+
+    @Override
+    public void enter(@Nonnull final PathArgument child) {
+        stack.push(stack.peek().node(child));
+    }
+
+    @Override
+    public void enter(@Nonnull final PathArgument... path) {
+        for (PathArgument arg : path) {
+            enter(arg);
+        }
+    }
+
+    @Override
+    public void enter(@Nonnull final Iterable<PathArgument> path) {
+        for (PathArgument arg : path) {
+            enter(arg);
+        }
+    }
+
+    @Override
+    public void exit() {
+        stack.pop();
+    }
+
+    @Override
+    public void exit(final int depth) {
+        Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
+        for (int i = 0; i < depth; ++i) {
+            stack.pop();
+        }
+    }
+
+    @Override
+    public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public void close() {
+        // No-op
+    }
+}
index 6a4a6315c7d4dfdf3789316fbef32fdb3d07cc87..037a1dec5b076dc9a073ce2d796d4ebf078a09e5 100644 (file)
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import java.util.Arrays;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -676,14 +677,16 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
+        followerDatastoreContextBuilder.shardBatchedModificationCount(2);
+        leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
 
         // Do an initial write to get the primary shard info cached.
 
         initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
 
         // Do an initial write to get the primary shard info cached.
 
-        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
-        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-        followerTestKit.doCommit(writeTx.ready());
+        DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+        writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx1.ready());
 
         // Wait for the commit to be replicated to the follower.
 
 
         // Wait for the commit to be replicated to the follower.
 
@@ -696,13 +699,22 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         // Create and prepare wo and rw tx's.
 
 
         // Create and prepare wo and rw tx's.
 
-        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
-        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
-        writeTx.write(CarsModel.newCarPath("optima"), car1);
+        writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+        LinkedList<MapEntryNode> cars = new LinkedList<>();
+        int carIndex = 1;
+        for(; carIndex <= 5; carIndex++) {
+            cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+            writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+        }
+
+        DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
+        cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+        writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+        carIndex++;
 
         DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
 
         DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
-        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
-        readWriteTx.write(CarsModel.newCarPath("sportage"), car2);
+        cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+        readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
             @Override
 
         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
             @Override
@@ -722,16 +734,18 @@ public class DistributedDataStoreRemotingIntegrationTest {
         // Submit tx's and enable elections on the follower so it becomes the leader, at which point the
         // readied tx's should get forwarded from the previous leader.
 
         // Submit tx's and enable elections on the follower so it becomes the leader, at which point the
         // readied tx's should get forwarded from the previous leader.
 
-        DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
-        DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+        DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready();
+        DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
+        DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready();
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
                 customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
 
         followerTestKit.doCommit(cohort1);
         followerTestKit.doCommit(cohort2);
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
                 customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
 
         followerTestKit.doCommit(cohort1);
         followerTestKit.doCommit(cohort2);
+        followerTestKit.doCommit(cohort3);
 
 
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()]));
     }
 
     @Test
     }
 
     @Test