import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
messageRetrySupport.addMessageToRetry(batched, getSender(),
"Could not commit transaction " + batched.getTransactionID());
} else {
- // TODO: what if this is not the first batch and leadership changed in between batched messages?
- // We could check if the commitCoordinator already has a cached entry and forward all the previous
- // batched modifications.
- LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
- leader.forward(batched, getContext());
+ // If this is not the first batch and leadership changed in between batched messages,
+ // we need to reconstruct previous BatchedModifications from the transaction
+ // DataTreeModification, honoring the max batched modification count, and forward all the
+ // previous BatchedModifications to the new leader.
+ Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
+ batched, datastoreContext.getShardBatchedModificationCount());
+
+ LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
+ newModifications.size(), leader);
+
+ for(BatchedModifications bm: newModifications) {
+ leader.forward(bm, getContext());
+ }
}
}
}
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
}
}
+ Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+ final int maxModificationsPerBatch) {
+ CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+ if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+ return Collections.singletonList(from);
+ }
+
+ cohortEntry.applyModifications(from.getModifications());
+
+ final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ if(newModifications.isEmpty() ||
+ newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ newModifications.add(new BatchedModifications(from.getTransactionID(),
+ from.getVersion(), from.getTransactionChainID()));
+ }
+
+ return newModifications.getLast();
+ }
+ });
+
+ BatchedModifications last = newModifications.getLast();
+ last.setDoCommitOnReady(from.isDoCommitOnReady());
+ last.setReady(from.isReady());
+ last.setTotalMessagesSent(newModifications.size());
+ return newModifications;
+ }
+
private void handleCanCommit(CohortEntry cohortEntry) {
String transactionID = cohortEntry.getTransactionID();
return cohort.getCandidate();
}
+ ReadWriteShardDataTreeTransaction getTransaction() {
+ return transaction;
+ }
+
int getTotalBatchedModificationsReceived() {
return totalBatchedModificationsReceived;
}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.serialization.JSerializer;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
/**
* Specialized message transformer, which transforms a {@link ReadyLocalTransaction}
return SerializationUtils.deserialize(bytes);
}
- private static final class BatchedCursor implements DataTreeModificationCursor {
- private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+ private static final class BatchedCursor extends AbstractBatchedModificationsCursor {
private final BatchedModifications message;
BatchedCursor(final BatchedModifications message) {
this.message = Preconditions.checkNotNull(message);
- stack.push(YangInstanceIdentifier.EMPTY);
}
@Override
- public void delete(final PathArgument child) {
- message.addModification(new DeleteModification(stack.peek().node(child)));
- }
-
- @Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- message.addModification(new MergeModification(stack.peek().node(child), data));
- }
-
- @Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- message.addModification(new WriteModification(stack.peek().node(child), data));
- }
-
- @Override
- public void enter(@Nonnull final PathArgument child) {
- stack.push(stack.peek().node(child));
- }
-
- @Override
- public void enter(@Nonnull final PathArgument... path) {
- for (PathArgument arg : path) {
- enter(arg);
- }
- }
-
- @Override
- public void enter(@Nonnull final Iterable<PathArgument> path) {
- for (PathArgument arg : path) {
- enter(arg);
- }
- }
-
- @Override
- public void exit() {
- stack.pop();
- }
-
- @Override
- public void exit(final int depth) {
- Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
- for (int i = 0; i < depth; ++i) {
- stack.pop();
- }
- }
-
- @Override
- public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public void close() {
- // No-op
+ protected BatchedModifications getModifications() {
+ return message;
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+/**
+ * Base class for a DataTreeModificationCursor that publishes to BatchedModifications instance(s).
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractBatchedModificationsCursor implements DataTreeModificationCursor {
+ private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+
+ protected AbstractBatchedModificationsCursor() {
+ stack.push(YangInstanceIdentifier.EMPTY);
+ }
+
+ protected abstract BatchedModifications getModifications();
+
+ @Override
+ public void delete(final PathArgument child) {
+ getModifications().addModification(new DeleteModification(stack.peek().node(child)));
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ getModifications().addModification(new MergeModification(stack.peek().node(child), data));
+ }
+
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ getModifications().addModification(new WriteModification(stack.peek().node(child), data));
+ }
+
+ @Override
+ public void enter(@Nonnull final PathArgument child) {
+ stack.push(stack.peek().node(child));
+ }
+
+ @Override
+ public void enter(@Nonnull final PathArgument... path) {
+ for (PathArgument arg : path) {
+ enter(arg);
+ }
+ }
+
+ @Override
+ public void enter(@Nonnull final Iterable<PathArgument> path) {
+ for (PathArgument arg : path) {
+ enter(arg);
+ }
+ }
+
+ @Override
+ public void exit() {
+ stack.pop();
+ }
+
+ @Override
+ public void exit(final int depth) {
+ Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
+ for (int i = 0; i < depth; ++i) {
+ stack.pop();
+ }
+ }
+
+ @Override
+ public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close() {
+ // No-op
+ }
+}
import com.typesafe.config.ConfigFactory;
import java.math.BigInteger;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@Test
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
+ followerDatastoreContextBuilder.shardBatchedModificationCount(2);
+ leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
// Do an initial write to get the primary shard info cached.
- DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
- writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- followerTestKit.doCommit(writeTx.ready());
+ DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+ writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx1.ready());
// Wait for the commit to be replicated to the follower.
// Create and prepare wo and rw tx's.
- writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
- MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
- writeTx.write(CarsModel.newCarPath("optima"), car1);
+ writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+ LinkedList<MapEntryNode> cars = new LinkedList<>();
+ int carIndex = 1;
+ for(; carIndex <= 5; carIndex++) {
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ }
+
+ DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ carIndex++;
DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
- MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
- readWriteTx.write(CarsModel.newCarPath("sportage"), car2);
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
@Override
// Submit tx's and enable elections on the follower so it becomes the leader, at which point the
// readied tx's should get forwarded from the previous leader.
- DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
- DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+ DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready();
+ DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
+ DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready();
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
followerTestKit.doCommit(cohort1);
followerTestKit.doCommit(cohort2);
+ followerTestKit.doCommit(cohort3);
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+ verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()]));
}
@Test