From 26721a6f2bc3ce4d524ccb562d7e7a38b4b76068 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 19 Feb 2016 16:40:52 -0500 Subject: [PATCH] Fix forwarding in Shard.handleBatchedModifications Addressed the TODO in Shard.handleBatchedModifications to handle the case where the BatchedModifications is not the first batch when leadership changes and the BatchedModifications needs to be forwarded to the new leader. Added code in ShardCommitCoordinator to reconstruct all previous BatchedModifications messages, if needed, from the transaction DataTreeModification, honoring the max batched modification count. Change-Id: I4fecb71be91aee24072086e1565437a69c20c8d9 Signed-off-by: Tom Pantelis --- .../controller/cluster/datastore/Shard.java | 19 +++- .../datastore/ShardCommitCoordinator.java | 37 ++++++++ .../ReadyLocalTransactionSerializer.java | 72 +------------- .../AbstractBatchedModificationsCursor.java | 94 +++++++++++++++++++ ...butedDataStoreRemotingIntegrationTest.java | 38 +++++--- 5 files changed, 175 insertions(+), 85 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 57e85570a9..138b71c10e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -427,11 +428,19 @@ public class Shard extends RaftActor { messageRetrySupport.addMessageToRetry(batched, getSender(), "Could not commit transaction " + batched.getTransactionID()); } else { - // TODO: what if this is not the first batch and leadership changed in between batched messages? - // We could check if the commitCoordinator already has a cached entry and forward all the previous - // batched modifications. - LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); - leader.forward(batched, getContext()); + // If this is not the first batch and leadership changed in between batched messages, + // we need to reconstruct previous BatchedModifications from the transaction + // DataTreeModification, honoring the max batched modification count, and forward all the + // previous BatchedModifications to the new leader. + Collection newModifications = commitCoordinator.createForwardedBatchedModifications( + batched, datastoreContext.getShardBatchedModificationCount()); + + LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(), + newModifications.size(), leader); + + for(BatchedModifications bm: newModifications) { + leader.forward(bm, getContext()); + } } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 51d8d5caec..76131e257b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -14,6 +14,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -30,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; @@ -247,6 +250,36 @@ class ShardCommitCoordinator { } } + Collection createForwardedBatchedModifications(final BatchedModifications from, + final int maxModificationsPerBatch) { + CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID()); + if(cohortEntry == null || cohortEntry.getTransaction() == null) { + return Collections.singletonList(from); + } + + cohortEntry.applyModifications(from.getModifications()); + + final LinkedList newModifications = new LinkedList<>(); + cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() { + @Override + protected BatchedModifications getModifications() { + if(newModifications.isEmpty() || + newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { + newModifications.add(new BatchedModifications(from.getTransactionID(), + from.getVersion(), from.getTransactionChainID())); + } + + return newModifications.getLast(); + } + }); + + BatchedModifications last = newModifications.getLast(); + last.setDoCommitOnReady(from.isDoCommitOnReady()); + last.setReady(from.isReady()); + last.setTotalMessagesSent(newModifications.size()); + return newModifications; + } + private void handleCanCommit(CohortEntry cohortEntry) { String transactionID = cohortEntry.getTransactionID(); @@ -621,6 +654,10 @@ class ShardCommitCoordinator { return cohort.getCandidate(); } + ReadWriteShardDataTreeTransaction getTransaction() { + return transaction; + } + int getTotalBatchedModificationsReceived() { return totalBatchedModificationsReceived; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java index fc5a99fe9a..a08a27fd04 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java @@ -8,19 +8,9 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.serialization.JSerializer; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import java.util.ArrayDeque; -import java.util.Deque; -import javax.annotation.Nonnull; import org.apache.commons.lang3.SerializationUtils; -import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; +import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; /** * Specialized message transformer, which transforms a {@link ReadyLocalTransaction} @@ -59,70 +49,16 @@ public final class ReadyLocalTransactionSerializer extends JSerializer { return SerializationUtils.deserialize(bytes); } - private static final class BatchedCursor implements DataTreeModificationCursor { - private final Deque stack = new ArrayDeque<>(); + private static final class BatchedCursor extends AbstractBatchedModificationsCursor { private final BatchedModifications message; BatchedCursor(final BatchedModifications message) { this.message = Preconditions.checkNotNull(message); - stack.push(YangInstanceIdentifier.EMPTY); } @Override - public void delete(final PathArgument child) { - message.addModification(new DeleteModification(stack.peek().node(child))); - } - - @Override - public void merge(final PathArgument child, final NormalizedNode data) { - message.addModification(new MergeModification(stack.peek().node(child), data)); - } - - @Override - public void write(final PathArgument child, final NormalizedNode data) { - message.addModification(new WriteModification(stack.peek().node(child), data)); - } - - @Override - public void enter(@Nonnull final PathArgument child) { - stack.push(stack.peek().node(child)); - } - - @Override - public void enter(@Nonnull final PathArgument... path) { - for (PathArgument arg : path) { - enter(arg); - } - } - - @Override - public void enter(@Nonnull final Iterable path) { - for (PathArgument arg : path) { - enter(arg); - } - } - - @Override - public void exit() { - stack.pop(); - } - - @Override - public void exit(final int depth) { - Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth); - for (int i = 0; i < depth; ++i) { - stack.pop(); - } - } - - @Override - public Optional> readNode(@Nonnull final PathArgument child) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void close() { - // No-op + protected BatchedModifications getModifications() { + return message; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java new file mode 100644 index 0000000000..02f8abb572 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/AbstractBatchedModificationsCursor.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.ArrayDeque; +import java.util.Deque; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; + +/** + * Base class for a DataTreeModificationCursor that publishes to BatchedModifications instance(s). + * + * @author Thomas Pantelis + */ +public abstract class AbstractBatchedModificationsCursor implements DataTreeModificationCursor { + private final Deque stack = new ArrayDeque<>(); + + protected AbstractBatchedModificationsCursor() { + stack.push(YangInstanceIdentifier.EMPTY); + } + + protected abstract BatchedModifications getModifications(); + + @Override + public void delete(final PathArgument child) { + getModifications().addModification(new DeleteModification(stack.peek().node(child))); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + getModifications().addModification(new MergeModification(stack.peek().node(child), data)); + } + + @Override + public void write(final PathArgument child, final NormalizedNode data) { + getModifications().addModification(new WriteModification(stack.peek().node(child), data)); + } + + @Override + public void enter(@Nonnull final PathArgument child) { + stack.push(stack.peek().node(child)); + } + + @Override + public void enter(@Nonnull final PathArgument... path) { + for (PathArgument arg : path) { + enter(arg); + } + } + + @Override + public void enter(@Nonnull final Iterable path) { + for (PathArgument arg : path) { + enter(arg); + } + } + + @Override + public void exit() { + stack.pop(); + } + + @Override + public void exit(final int depth) { + Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth); + for (int i = 0; i < depth; ++i) { + stack.pop(); + } + } + + @Override + public Optional> readNode(@Nonnull final PathArgument child) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() { + // No-op + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 6a4a6315c7..037a1dec5b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.math.BigInteger; import java.util.Arrays; +import java.util.LinkedList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -676,14 +677,16 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { + followerDatastoreContextBuilder.shardBatchedModificationCount(2); + leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry"); // Do an initial write to get the primary shard info cached. - DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - followerTestKit.doCommit(writeTx.ready()); + DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx1.ready()); // Wait for the commit to be replicated to the follower. @@ -696,13 +699,22 @@ public class DistributedDataStoreRemotingIntegrationTest { // Create and prepare wo and rw tx's. - writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - writeTx.write(CarsModel.newCarPath("optima"), car1); + writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction(); + LinkedList cars = new LinkedList<>(); + int carIndex = 1; + for(; carIndex <= 5; carIndex++) { + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + } + + DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + carIndex++; DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); - readWriteTx.write(CarsModel.newCarPath("sportage"), car2); + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { @Override @@ -722,16 +734,18 @@ public class DistributedDataStoreRemotingIntegrationTest { // Submit tx's and enable elections on the follower so it becomes the leader, at which point the // readied tx's should get forwarded from the previous leader. - DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready(); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); + DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready(); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); followerTestKit.doCommit(cohort1); followerTestKit.doCommit(cohort2); + followerTestKit.doCommit(cohort3); - verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()])); } @Test -- 2.36.6