import com.google.common.base.Preconditions;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
+public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
private final ShardDataTreeTransactionParent parent;
protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final String id, final DataTreeModification modification) {
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionID(), ready.getTxnClientVersion());
- CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort());
+ ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
LOG.debug("readyTransaction : {}", transactionID);
- ShardDataTreeCohort cohort = transaction.ready();
-
getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
- cohort, returnSerialized, doImmediateCommit), getContext());
+ transaction, returnSerialized, doImmediateCommit), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
*/
package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.ReadWriteShardDataTreeTransaction;
/**
* Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction.
*/
public class ForwardedReadyTransaction {
private final String transactionID;
- private final ShardDataTreeCohort cohort;
+ private final ReadWriteShardDataTreeTransaction transaction;
private final boolean returnSerialized;
private final boolean doImmediateCommit;
private final short txnClientVersion;
public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
- ShardDataTreeCohort cohort, boolean returnSerialized,
+ ReadWriteShardDataTreeTransaction transaction, boolean returnSerialized,
boolean doImmediateCommit) {
this.transactionID = transactionID;
- this.cohort = cohort;
+ this.transaction = Preconditions.checkNotNull(transaction);
this.returnSerialized = returnSerialized;
this.txnClientVersion = txnClientVersion;
this.doImmediateCommit = doImmediateCommit;
return transactionID;
}
- public ShardDataTreeCohort getCohort() {
- return cohort;
+ public ReadWriteShardDataTreeTransaction getTransaction() {
+ return transaction;
}
public boolean isReturnSerialized() {
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
MutableCompositeModification modification,
boolean doCommitOnReady) {
if(remoteReadWriteTransaction){
- return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, true, doCommitOnReady);
+ return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION,
+ doCommitOnReady);
} else {
setupCohortDecorator(shard, cohort);
return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
}
}
+ protected Object prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, String transactionID,
+ short version, boolean doCommitOnReady) {
+ ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
+ doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
+ doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
+ return new ForwardedReadyTransaction(transactionID, version,
+ new ReadWriteShardDataTreeTransaction(mockParent, transactionID,
+ mock(DataTreeModification.class)), true, doCommitOnReady);
+ }
+
protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
String transactionID,
MutableCompositeModification modification) {
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
- cohort1, true, false), getRef());
+ shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
- cohort2, true, false), getRef());
+ shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
- cohort3, true, false), getRef());
+ shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and