/**
* This method is overridden to ensure the previous Tx's ready operations complete
- * before we create the next shard Tx in the chain to avoid creation failures if the
+ * before we initiate the next Tx in the chain to avoid creation failures if the
* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<Object> sendCreateTransaction(final ActorSelection shard,
- final Object serializedCreateMessage) {
-
+ protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
// Check if there are any previous ready Futures, otherwise let the super class handle it.
if(previousReadyFutures.isEmpty()) {
- return super.sendCreateTransaction(shard, serializedCreateMessage);
+ return super.sendFindPrimaryShardAsync(shardName);
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+ previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
}
// Combine the ready Futures into 1.
Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
- previousReadyFutures, getActorContext().getActorSystem().dispatcher());
+ previousReadyFutures, getActorContext().getClientDispatcher());
// Add a callback for completion of the combined Futures.
- final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+ final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
@Override
public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
if(failure != null) {
// A Ready Future failed so fail the returned Promise.
- createTxPromise.failure(failure);
+ returnPromise.failure(failure);
} else {
- LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
+ LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
getIdentifier(), getTransactionChainId());
- // Send the CreateTx message and use the resulting Future to complete the
+ // Send the FindPrimaryShard message and use the resulting Future to complete the
// returned Promise.
- createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
- serializedCreateMessage));
+ returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
}
}
};
- combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
+ combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
- return createTxPromise.future();
+ return returnPromise.future();
}
}
}
} else {
// Throwing an exception here will fail the Future.
- throw new IllegalArgumentException(String.format("Invalid reply type %s",
- serializedReadyReply.getClass()));
+ throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+ identifier, serializedReadyReply.getClass()));
}
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
}
- /**
- * Method called to send a CreateTransaction message to a shard.
- *
- * @param shard the shard actor to send to
- * @param serializedCreateMessage the serialized message to send
- * @return the response Future
- */
- protected Future<Object> sendCreateTransaction(ActorSelection shard,
- Object serializedCreateMessage) {
- return actorContext.executeOperationAsync(shard, serializedCreateMessage);
- }
-
@Override
public Object getIdentifier() {
return this.identifier;
return ShardStrategyFactory.getStrategy(path).findShard(path);
}
+ protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+ return actorContext.findPrimaryShardAsync(shardName);
+ }
+
private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
String shardName = shardNameFromIdentifier(path);
TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
if(txFutureCallback == null) {
- Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
+ Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
- final TransactionFutureCallback newTxFutureCallback =
- new TransactionFutureCallback(shardName);
+ final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
txFutureCallback = newTxFutureCallback;
txFutureCallbackMap.put(shardName, txFutureCallback);
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
- new CreateTransaction(identifier.toString(),
- TransactionProxy.this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable());
+ Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
+ TransactionProxy.this.transactionType.ordinal(),
+ getTransactionChainId()).toSerializable();
+
+ Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
createTxFuture.onComplete(this, actorContext.getClientDispatcher());
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* @author Thomas Pantelis
*/
public abstract class AbstractTransactionProxyTest {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
private static ActorSystem system;
private final Configuration configuration = new MockConfiguration();
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ log.info("Created mock shard actor {}", actorRef);
+
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
TransactionType type, int transactionVersion) {
- ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+ ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
- doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
- eqCreateTransaction(memberName, type));
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
+ memberName, shardActorRef);
+ }
- return actorRef;
+ protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+ TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
+
+ ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ log.info("Created mock shard Tx actor {}", txActorRef);
+
+ doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
+ txActorRef.path().toString());
+
+ doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(prefix, type));
+
+ return txActorRef;
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
assertEquals("Cohort actor path", expReply, actual);
} else {
- // Expecting exception.
try {
Await.result(future, Duration.create(5, TimeUnit.SECONDS));
fail("Expected exception from ready operation Future");
} catch(Exception e) {
- // Expected
+ assertTrue(String.format("Expected exception type %s. Actual %s",
+ expReply, e.getClass()), ((Class<?>)expReply).isInstance(e));
}
}
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import akka.actor.ActorRef;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Promise;
-public class TransactionChainProxyTest extends AbstractActorTest{
- ActorContext actorContext = null;
- SchemaContext schemaContext = mock(SchemaContext.class);
-
- @Mock
- ActorContext mockActorContext;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- actorContext = new MockActorContext(getSystem());
- actorContext.setSchemaContext(schemaContext);
-
- doReturn(schemaContext).when(mockActorContext).getSchemaContext();
- doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
- }
+public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
@SuppressWarnings("resource")
@Test
public void testNewReadOnlyTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
}
@SuppressWarnings("resource")
@Test
public void testNewReadWriteTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
}
@SuppressWarnings("resource")
@Test
public void testNewWriteOnlyTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
}
@Test
public void testClose() throws Exception {
- ActorContext context = mock(ActorContext.class);
+ new TransactionChainProxy(mockActorContext).close();
- new TransactionChainProxy(context).close();
-
- verify(context, times(1)).broadcast(anyObject());
+ verify(mockActorContext, times(1)).broadcast(anyObject());
}
@Test
verify(mockActorContext, times(0)).acquireTxCreationPermit();
}
+
+ /**
+ * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
+ * initiated until the first one completes its read future.
+ */
+ @Test
+ public void testChainedReadWriteTransactions() throws Exception {
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ expectBatchedModifications(txActorRef1, 1);
+
+ Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
+ doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
+
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+ writeTx1.ready();
+
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+
+ String tx2MemberName = "tx2MemberName";
+ doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
+ ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+ ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
+ DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
+
+ expectBatchedModifications(txActorRef2, 1);
+
+ final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
+
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch write2Complete = new CountDownLatch(1);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ write2Complete.countDown();
+ }
+ }
+ }.start();
+
+ assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ try {
+ verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
+ eqCreateTransaction(tx2MemberName, READ_WRITE));
+ } catch (AssertionError e) {
+ fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+ }
+
+ readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+
+ verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
+ eqCreateTransaction(tx2MemberName, READ_WRITE));
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ expectBatchedModifications(actorRef, 1);
+
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+ NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ txChainProxy.newWriteOnlyTransaction();
+ }
}