Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Bug 5450: Query akka cluster state on Follower ElectionTimeout
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
test
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
DistributedDataStoreRemotingIntegrationTest.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
index 0c7575a61ddfeb6521b3ec23612245bbc2662cd9..2f7c790269f0f9702157b6468234154e9e1cb43d 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
@@
-43,6
+43,7
@@
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
@@
-99,7
+100,7
@@
import scala.concurrent.duration.FiniteDuration;
*
* @author Thomas Pantelis
*/
*
* @author Thomas Pantelis
*/
-public class DistributedDataStoreRemotingIntegrationTest {
+public class DistributedDataStoreRemotingIntegrationTest
extends AbstractTest
{
private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
private static final String[] CARS = {"cars"};
private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
private static final String[] CARS = {"cars"};
@@
-121,6
+122,8
@@
public class DistributedDataStoreRemotingIntegrationTest {
private final DatastoreContext.Builder followerDatastoreContextBuilder =
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
private final DatastoreContext.Builder followerDatastoreContextBuilder =
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+ private final TransactionIdentifier tx1 = nextTransactionId();
+ private final TransactionIdentifier tx2 = nextTransactionId();
private DistributedDataStore followerDistributedDataStore;
private DistributedDataStore leaderDistributedDataStore;
private DistributedDataStore followerDistributedDataStore;
private DistributedDataStore leaderDistributedDataStore;
@@
-141,6
+144,13
@@
public class DistributedDataStoreRemotingIntegrationTest {
@After
public void tearDown() {
@After
public void tearDown() {
+ if (followerDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+ if (leaderDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
JavaTestKit.shutdownActorSystem(follower2System);
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
JavaTestKit.shutdownActorSystem(follower2System);
@@
-517,6
+527,7
@@
public class DistributedDataStoreRemotingIntegrationTest {
shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
@@
-568,7
+579,7
@@
public class DistributedDataStoreRemotingIntegrationTest {
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(
"tx-1"
, modification, true);
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(
tx1
, modification, true);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
@@
-587,7
+598,7
@@
public class DistributedDataStoreRemotingIntegrationTest {
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
- readyLocal = new ReadyLocalTransaction(
"tx-2"
, modification, false);
+ readyLocal = new ReadyLocalTransaction(
tx2
, modification, false);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
@@
-604,7
+615,7
@@
public class DistributedDataStoreRemotingIntegrationTest {
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)),
"tx-2"
);
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)),
tx2
);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
@@
-633,9
+644,9
@@
public class DistributedDataStoreRemotingIntegrationTest {
MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
- ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(
"tx-1"
,
+ ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(
tx1
,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class),
"tx-1"
, modification), true);
+ Mockito.mock(ShardDataTreeTransactionParent.class),
tx1
, modification), true);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
@@
-653,9
+664,9
@@
public class DistributedDataStoreRemotingIntegrationTest {
MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
- forwardedReady = new ForwardedReadyTransaction(
"tx-2"
,
+ forwardedReady = new ForwardedReadyTransaction(
tx2
,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class),
"tx-2"
, modification), false);
+ Mockito.mock(ShardDataTreeTransactionParent.class),
tx2
, modification), false);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
@@
-672,7
+683,7
@@
public class DistributedDataStoreRemotingIntegrationTest {
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)),
"tx-2"
);
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)),
tx2
);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
@@
-702,6
+713,13
@@
public class DistributedDataStoreRemotingIntegrationTest {
}
});
}
});
+ MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertEquals("getLastApplied", 0, raftState.getLastApplied());
+ }
+ });
+
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
@@
-764,6
+782,7
@@
public class DistributedDataStoreRemotingIntegrationTest {
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
shardElectionTimeoutFactor(10));
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
shardElectionTimeoutFactor(10));
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
// Submit all tx's - the messages should get queued for retry.
leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
// Submit all tx's - the messages should get queued for retry.
@@
-940,6
+959,8
@@
public class DistributedDataStoreRemotingIntegrationTest {
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
@@
-980,6
+1001,8
@@
public class DistributedDataStoreRemotingIntegrationTest {
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));