import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
*
* @author Thomas Pantelis
*/
-public class DistributedDataStoreRemotingIntegrationTest {
+public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
private static final String[] CARS = {"cars"};
private final DatastoreContext.Builder followerDatastoreContextBuilder =
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+ private final TransactionIdentifier tx1 = nextTransactionId();
+ private final TransactionIdentifier tx2 = nextTransactionId();
private DistributedDataStore followerDistributedDataStore;
private DistributedDataStore leaderDistributedDataStore;
shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true);
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
- readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
+ readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
- ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
+ ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
- forwardedReady = new ForwardedReadyTransaction("tx-2",
+ forwardedReady = new ForwardedReadyTransaction(tx2,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), false);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
}
});
+ MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertEquals("getLastApplied", 0, raftState.getLastApplied());
+ }
+ });
+
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
shardElectionTimeoutFactor(10));
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
// Submit all tx's - the messages should get queued for retry.
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));