import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
- final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
- protected Props newShardPropsWithRecoveryComplete() {
-
- final Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- @Override
- protected void onRecoveryComplete() {
- try {
- super.onRecoveryComplete();
- } finally {
- recoveryComplete.countDown();
- }
- }
- };
- }
- };
- return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
- }
-
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
@Test
public void testApplySnapshot() throws Exception {
+
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
+ testkit.waitUntilLeader(shard);
+
final DataTree store = InMemoryDataTreeFactory.getInstance().create();
store.setSchemaContext(SCHEMA_CONTEXT);
@Test
public void testApplyState() throws Exception {
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ testkit.waitUntilLeader(shard);
+
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@Test
public void testApplyStateWithCandidatePayload() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
- recoveryComplete.await(5, TimeUnit.SECONDS);
+ testkit.waitUntilLeader(shard);
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
}
@Test
- public void testAbortTransaction() throws Throwable {
+ public void testAbortCurrentTransaction() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortTransaction");
+ "testAbortCurrentTransaction");
waitUntilLeader(shard);
}};
}
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {{
+ final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+ @SuppressWarnings("serial")
+ final Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ super.onReceiveCommand(message);
+ if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ if(cleaupCheckLatch.get() != null) {
+ cleaupCheckLatch.get().countDown();
+ }
+ }
+ }
+ };
+ }
+ };
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+ Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+ doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready the tx.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Send the AbortTransaction message.
+
+ shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ verify(cohort).abort();
+
+ // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+ cleaupCheckLatch.set(new CountDownLatch(1));
+ assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+ cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Now send CanCommitTransaction - should fail.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+ Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCreateSnapshot() throws Exception {
testCreateSnapshot(true, "testCreateSnapshot");