//
if(isLeader()) {
try {
- BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
- sender().tell(reply, self());
+ boolean ready = commitCoordinator.handleTransactionModifications(batched);
+ if(ready) {
+ sender().tell(READY_TRANSACTION_REPLY, self());
+ } else {
+ sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+ }
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
+ if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+ ActorRef replyActorPath = getSelf();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+ ready.getTxnClientVersion());
getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
-
+ readyTransactionReply, getSelf());
} else {
-
- getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
- READY_TRANSACTION_REPLY, getSelf());
+ getSender().tell(READY_TRANSACTION_REPLY, getSelf());
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
*
* @throws ExecutionException if an error occurs loading the cache
*/
- public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+ public boolean handleTransactionModifications(BatchedModifications batched)
throws ExecutionException {
CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry.applyModifications(batched.getModifications());
- String cohortPath = null;
if(batched.isReady()) {
if(log.isDebugEnabled()) {
log.debug("{}: Readying Tx {}, client version {}", name,
}
cohortEntry.ready(cohortDecorator);
- cohortPath = shardActorPath;
}
- return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+ return batched.isReady();
}
/**
modification.apply(transaction);
}
- getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+ if(batched.isReady()) {
+ readyTransaction(transaction, false);
+ } else {
+ getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+ }
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called", getIdentifier());
- // Send the remaining batched modifications if any.
+ // Send the remaining batched modifications, if any, with the ready flag set.
- sendBatchedModifications();
-
- // Send the ReadyTransaction message to the Tx actor.
-
- Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true);
- return transformReadyReply(readyReplyFuture);
+ return transformReadyReply(lastModificationsFuture);
}
protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
public ActorSelection checkedApply(Object serializedReadyReply) {
LOG.debug("Tx {} readyTransaction", getIdentifier());
- // At this point the rwady operation succeeded and we need to extract the cohort
+ // At this point the ready operation succeeded and we need to extract the cohort
// actor path from the reply.
- if (serializedReadyReply instanceof ReadyTransactionReply) {
- return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
- } else if(serializedReadyReply instanceof BatchedModificationsReply) {
- return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
- } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- String cohortPath = deserializeCohortPath(reply.getCohortPath());
- return actorContext.actorSelection(cohortPath);
- } else {
- // Throwing an exception here will fail the Future.
- throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
- getIdentifier(), serializedReadyReply.getClass()));
+ if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+ ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+ return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
}
+
+ // Throwing an exception here will fail the Future.
+ throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+ getIdentifier(), serializedReadyReply.getClass()));
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
- protected String deserializeCohortPath(String cohortPath) {
- return cohortPath;
+ protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+ return readyTxReply.getCohortPath();
+ }
+
+ private BatchedModifications newBatchedModifications() {
+ return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
}
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
- transactionChainId);
+ batchedModifications = newBatchedModifications();
}
batchedModifications.addModification(modification);
protected Future<Object> sendBatchedModifications(boolean ready) {
Future<Object> sent = null;
- if(batchedModifications != null) {
+ if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+ if(batchedModifications == null) {
+ batchedModifications = newBatchedModifications();
+ }
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
batchedModifications.getModifications().size(), ready);
batchedModifications.setReady(ready);
sent = executeOperationAsync(batchedModifications);
- batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
- transactionChainId);
+ if(ready) {
+ batchedModifications = null;
+ } else {
+ batchedModifications = newBatchedModifications();
+ }
}
return sent;
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
operationCompleter);
- } else if (transactionType == TransactionType.WRITE_ONLY &&
- actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
- actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
} else {
return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Context for a write-only transaction.
- *
- * @author Thomas Pantelis
- */
-public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
- private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
-
- public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
- String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
- remoteTransactionVersion, operationCompleter);
- }
-
- @Override
- public Future<ActorSelection> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", getIdentifier());
-
- // Send the remaining batched modifications if any.
-
- Future<Object> lastModificationsFuture = sendBatchedModifications(true);
-
- return transformReadyReply(lastModificationsFuture);
- }
-}
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
}
@Override
- protected String deserializeCohortPath(String cohortPath) {
+ protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
// In base Helium we used to return the local path of the actor which represented
// a remote ThreePhaseCommitCohort. The local path would then be converted to
// a remote path using this resolvePath method. To maintain compatibility with
// we could remove this code to resolvePath and just use the cohortPath as the
// resolved cohortPath
if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- return getActorContext().resolvePath(transactionPath, cohortPath);
+ return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
}
- return cohortPath;
+ return readyTxReply.getCohortPath();
}
}
public class BatchedModificationsReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
- private static final byte COHORT_PATH_NOT_PRESENT = 0;
- private static final byte COHORT_PATH_PRESENT = 1;
-
private int numBatched;
- private String cohortPath;
public BatchedModificationsReply() {
}
this.numBatched = numBatched;
}
- public BatchedModificationsReply(int numBatched, String cohortPath) {
- this.numBatched = numBatched;
- this.cohortPath = cohortPath;
- }
-
public int getNumBatched() {
return numBatched;
}
- public String getCohortPath() {
- return cohortPath;
- }
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
numBatched = in.readInt();
-
- if(in.readByte() == COHORT_PATH_PRESENT) {
- cohortPath = in.readUTF();
- }
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(numBatched);
-
- if(cohortPath != null) {
- out.writeByte(COHORT_PATH_PRESENT);
- out.writeUTF(cohortPath);
- } else {
- out.writeByte(COHORT_PATH_NOT_PRESENT);
- }
}
@Override
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
- .append(cohortPath).append("]");
+ builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append("]");
return builder.toString();
}
}
private final DOMStoreThreePhaseCommitCohort cohort;
private final Modification modification;
private final boolean returnSerialized;
- private final int txnClientVersion;
+ private final short txnClientVersion;
- public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+ public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
DOMStoreThreePhaseCommitCohort cohort, Modification modification,
boolean returnSerialized) {
this.transactionID = transactionID;
return returnSerialized;
}
- public int getTxnClientVersion() {
+ public short getTxnClientVersion() {
return txnClientVersion;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+@Deprecated
public class ReadyTransaction implements SerializableMessage{
public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
ShardTransactionMessages.ReadyTransaction.class;
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class ReadyTransactionReply implements SerializableMessage {
+public class ReadyTransactionReply extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
+
public static final Class<ShardTransactionMessages.ReadyTransactionReply> SERIALIZABLE_CLASS =
ShardTransactionMessages.ReadyTransactionReply.class;
- private final String cohortPath;
+ private String cohortPath;
+
+ public ReadyTransactionReply() {
+ }
public ReadyTransactionReply(String cohortPath) {
+ this(cohortPath, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public ReadyTransactionReply(String cohortPath, short version) {
+ super(version);
this.cohortPath = cohortPath;
}
}
@Override
- public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
- return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
- .setActorPath(cohortPath)
- .build();
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ cohortPath = in.readUTF();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeUTF(cohortPath);
+ }
+
+ @Override
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+ return this;
+ } else {
+ return ShardTransactionMessages.ReadyTransactionReply.newBuilder().setActorPath(cohortPath).build();
+ }
}
public static ReadyTransactionReply fromSerializable(Object serializable) {
- ShardTransactionMessages.ReadyTransactionReply o =
- (ShardTransactionMessages.ReadyTransactionReply) serializable;
+ if(serializable instanceof ReadyTransactionReply) {
+ return (ReadyTransactionReply)serializable;
+ } else {
+ ShardTransactionMessages.ReadyTransactionReply o =
+ (ShardTransactionMessages.ReadyTransactionReply) serializable;
+ return new ReadyTransactionReply(o.getActorPath(), DataStoreVersions.HELIUM_2_VERSION);
+ }
+ }
- return new ReadyTransactionReply(o.getActorPath());
+ public static boolean isSerializedType(Object message) {
+ return message instanceof ReadyTransactionReply ||
+ message instanceof ShardTransactionMessages.ReadyTransactionReply;
}
}
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
return argThat(matcher);
}
- protected Future<Object> readySerializedTxReply(String path) {
- return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
- }
-
protected Future<Object> readyTxReply(String path) {
return Futures.successful((Object)new ReadyTransactionReply(path));
}
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
- protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
- Future<BatchedModificationsReply> replyFuture = Futures.successful(
- new BatchedModificationsReply(count, actorRef.path().toString()));
- doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+ protected void expectBatchedModificationsReady(ActorRef actorRef) {
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
any(ActorSelection.class), isA(BatchedModifications.class));
}
- protected void expectReadyTransaction(ActorRef actorRef) {
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
- }
-
protected void expectFailedBatchedModifications(ActorRef actorRef) {
doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
waitUntilLeader(shard);
- final String transactionID1 = "tx1";
- final String transactionID2 = "tx2";
- final String transactionID3 = "tx3";
+ // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
- final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
- final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
- final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
- ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
- if(transactionID.equals(transactionID1)) {
- mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
- return mockCohort1.get();
- } else if(transactionID.equals(transactionID2)) {
- mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
- return mockCohort2.get();
- } else {
- mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
- return mockCohort3.get();
- }
- }
- };
+ InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+ String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+ modification2);
+
+ String transactionID3 = "tx3";
+ MutableCompositeModification modification3 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+ modification3);
long timeoutSec = 5;
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- // Send a BatchedModifications message for the first transaction.
+ // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+ // by the ShardTransaction.
- shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
- assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
- assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
+ ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+ expectMsgClass(duration, ReadyTransactionReply.class));
+ assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the CanCommitTransaction message for the first Tx.
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send BatchedModifications for the next 2 Tx's.
+ // Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
- TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// processed after the first Tx completes.
assertEquals("Commits complete", true, done);
- InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
- inOrder.verify(mockCohort1.get()).canCommit();
- inOrder.verify(mockCohort1.get()).preCommit();
- inOrder.verify(mockCohort1.get()).commit();
- inOrder.verify(mockCohort2.get()).canCommit();
- inOrder.verify(mockCohort2.get()).preCommit();
- inOrder.verify(mockCohort2.get()).commit();
- inOrder.verify(mockCohort3.get()).canCommit();
- inOrder.verify(mockCohort3.get()).preCommit();
- inOrder.verify(mockCohort3.get()).commit();
+ InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+ inOrder.verify(cohort1).canCommit();
+ inOrder.verify(cohort1).preCommit();
+ inOrder.verify(cohort1).commit();
+ inOrder.verify(cohort2).canCommit();
+ inOrder.verify(cohort2).preCommit();
+ inOrder.verify(cohort2).commit();
+ inOrder.verify(cohort3).canCommit();
+ inOrder.verify(cohort3).preCommit();
+ inOrder.verify(cohort3).commit();
// Verify data in the data store.
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
YangInstanceIdentifier path = TestModel.TEST_PATH;
shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
containerNode, true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Create a read Tx on the same chain.
waitUntilLeader(shard);
+ InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+ // Setup a simulated transactions with a mock cohort.
+
String transactionID = "tx";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ TestModel.TEST_PATH, containerNode, modification);
+
FiniteDuration duration = duration("5 seconds");
- // Send a BatchedModifications to start a transaction.
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
- NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+ InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
waitUntilLeader(shard);
- // Setup 2 mock cohorts. The first one fails in the commit phase.
+ // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+ // commit phase.
- final String transactionID1 = "tx1";
- final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
- final String transactionID2 = "tx2";
- final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
- ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
- DOMStoreThreePhaseCommitCohort actual) {
- return transactionID1.equals(transactionID) ? cohort1 : cohort2;
- }
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Send BatchedModifications to start and ready each transaction.
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
- shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
waitUntilLeader(shard);
String transactionID = "tx1";
- final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
- ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
- DOMStoreThreePhaseCommitCohort actual) {
- return cohort;
- }
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
FiniteDuration duration = duration("5 seconds");
- // Send BatchedModifications to start and ready a transaction.
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
- shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
final FiniteDuration duration = duration("5 seconds");
String transactionID = "tx1";
- final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
- DOMStoreThreePhaseCommitCohort actual) {
- return cohort;
- }
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
- // Send BatchedModifications to start and ready a transaction.
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
- shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
}
};
- shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ modification, preCommit);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
final FiniteDuration duration = duration("5 seconds");
+ InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- // Create and ready the 1st Tx - will timeout
+ // Create 1st Tx - will timeout
String transactionID1 = "tx1";
- shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
- TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+ modification1);
- // Create and ready the 2nd Tx
+ // Create 2nd Tx
- String transactionID2 = "tx2";
+ String transactionID2 = "tx3";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- shard.tell(newBatchedModifications(transactionID2, listNodePath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+ listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+ modification2);
+
+ // Ready the Tx's
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
final FiniteDuration duration = duration("5 seconds");
+ InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+ modification2);
+
String transactionID3 = "tx3";
+ MutableCompositeModification modification3 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
- // Send a BatchedModifications to start transactions and ready them.
+ // Ready the Tx's
- shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx.
// Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
- final String transactionID1 = "tx1";
- final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
- final String transactionID2 = "tx2";
- final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
- @Override
- public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
- DOMStoreThreePhaseCommitCohort actual) {
- return transactionID1.equals(transactionID) ? cohort1 : cohort2;
- }
- };
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
- // Send BatchedModifications to start and ready each transaction.
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
-
- shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
- expectMsgClass(duration, BatchedModificationsReply.class);
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
}
@Test
- public void testOnReceiveReadyTransaction() throws Exception {
+ public void testOnReceiveBatchedModificationsReady() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveBatchedModificationsReady");
+
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.addModification(new WriteModification(writePath, writeData));
+
+ transaction.tell(batched, getRef());
+
+ expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceivePreLithiumReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
- "testReadyTransaction");
+ "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
- watch(transaction);
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
transaction.tell(new ReadyTransaction().toSerializable(), getRef());
- expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
- Terminated.class);
- expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
- Terminated.class);
+ expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
}};
// test
new JavaTestKit(getSystem()) {{
final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
- "testReadyTransaction2");
+ "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
- watch(transaction);
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
transaction.tell(new ReadyTransaction(), getRef());
- expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
- Terminated.class);
- expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
- Terminated.class);
+ expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
}};
}
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
}
- batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+ batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
// Tx 2 should've proceeded to find the primary shard.
verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
writeTx1.ready();
- verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
String tx2MemberName = "tx2MemberName";
doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
}
- readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+ readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
eqCreateTransaction(tx2MemberName, READ_WRITE));
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
+ expectBatchedModificationsReady(actorRef);
final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
// This sends the batched modification.
transactionProxy.ready();
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
}
@Test(expected=IllegalStateException.class)
}
@Test
- public void testReadyWithReadWrite() throws Exception {
+ public void testReadWrite() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
eq(actorSelection(actorRef)), eqSerializedReadData());
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), false,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+ }
+
+ @Test
+ public void testReadyWithReadWrite() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
+
+ expectBatchedModificationsReady(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
isA(BatchedModifications.class));
+ }
- verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ @Test
+ public void testReadyWithNoModifications() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
+
+ expectBatchedModificationsReady(actorRef);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), true);
}
@Test
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModificationsReady(actorRef, 1);
+ expectBatchedModificationsReady(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModificationsReady(actorRef, 1);
+ expectBatchedModificationsReady(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
doReturn(true).when(mockActorContext).isPathLocal(anyString());
- doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), isA(BatchedModifications.class));
+ expectBatchedModificationsReady(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- // testing ready
- doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
-
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
expectBatchedModifications(actorRef, shardBatchedModificationCount);
- expectReadyTransaction(actorRef);
-
YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
- boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
- verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
+ verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
new DeleteModification(deletePath2));
}
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.inOrder;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.dispatch.Dispatchers;
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
cohort1, modification1, true), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
/**
* Unit tests for backwards compatibility with pre-Lithium versions.
return argThat(matcher);
}
+ private Future<Object> readySerializedTxReply(String path, short version) {
+ return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
+ }
+
private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
(Serializable) new BatchedModificationsReply(100).toSerializable());
assertEquals("getNumBatched", 100, clone.getNumBatched());
- assertEquals("getCohortPath", null, clone.getCohortPath());
-
- clone = (BatchedModificationsReply) SerializationUtils.clone(
- (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
- assertEquals("getNumBatched", 50, clone.getNumBatched());
- assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadyTransactionReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadyTransactionReplyTest {
+
+ @Test
+ public void testSerialization() {
+ String cohortPath = "cohort path";
+ ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ReadyTransactionReply.class, serialized.getClass());
+
+ ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+ (Serializable) serialized));
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+ assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+ }
+
+ @Test
+ public void testSerializationWithPreLithiumVersion() throws Exception {
+ String cohortPath = "cohort path";
+ ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath, DataStoreVersions.HELIUM_2_VERSION);
+
+ Object serialized = expected.toSerializable();
+ assertEquals("Serialized type", ShardTransactionMessages.ReadyTransactionReply.class, serialized.getClass());
+
+ ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+ (Serializable) serialized));
+ assertEquals("getVersion", DataStoreVersions.HELIUM_2_VERSION, actual.getVersion());
+ assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+ }
+}