import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReadyLocalTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
((ReadyTransactionReply)resp).getCohortPath());
+ Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+ Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+ leaderDistributedDataStore.getActorContext(), Arrays.asList(
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
}
+ @SuppressWarnings("unchecked")
@Test
public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true);
+ Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
forwardedReady = new ForwardedReadyTransaction("tx-2",
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false);
+ Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), false);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
((ReadyTransactionReply)resp).getCohortPath());
+ Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+ Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+ leaderDistributedDataStore.getActorContext(), Arrays.asList(
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
// Gracefully stop the leader via a Shutdown message.
+ sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+ shardElectionTimeoutFactor(100));
+
FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
ActorRef leaderActor = Await.result(future, duration);
- Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+ Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
// Commit the 2 transactions. They should finish and succeed.