import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config");
+ mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
mockShardName.toString());
}
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString();
+ String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if(system == getSystem()) {
return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
}
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
getPeerAddressResolver() instanceof ShardPeerAddressResolver);
assertEquals("peerMembers", Sets.newHashSet(
- new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
- new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
+ ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
+ ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
shardBuilder.getPeerAddresses().keySet());
- assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix),
+ assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString();
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class).
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
+ new AddServerReply(ServerChangeStatus.OK, memberId2)).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
mock(DataTree.class), leaderVersion), mockShardLeaderActor);
InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
//construct a mock response message
- AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
AddServer.class);
String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
- Props.create(MockRespondActor.class, addServerReply), leaderId);
+ Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> respondActor =
- TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), respondActor);
- respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
- assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(),
+ assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Success.class);
}};
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString();
+ String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.underlyingActor().waitForMemberUp();
//construct a mock response message
- RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
- String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString();
+ String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Status.Success.class);
}};
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1).
- type(shardMrgIDSuffix).build().toString();
+ String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
MessageCollectorActor.props(), shardId);
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1).
- type(shardMrgIDSuffix).build().toString();
+ String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
- String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1).
- type(shardMrgIDSuffix).build().toString();
+ String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
LOG.info("testShutDown ending");
}
+ @Test
+ public void testChangeServersVotingStatus() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
+ respondActor, ChangeServersVotingStatus.class);
+ assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+ ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
+ shardMrgIDSuffix).toString(), Boolean.TRUE));
+
+ expectMsgClass(duration("5 seconds"), Success.class);
+ }};
+ }
+
+ @Test
+ public void testChangeServersVotingStatusWithNoLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+ Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+ assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
+ }};
+ }
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
private static class MockRespondActor extends MessageCollectorActor {
static final String CLEAR_RESPONSE = "clear-response";
- static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
- private volatile Object responseMsg;
+ private Object responseMsg;
+ private final Class<?> requestClass;
@SuppressWarnings("unused")
- public MockRespondActor() {
- }
-
- @SuppressWarnings("unused")
- public MockRespondActor(Object responseMsg) {
+ public MockRespondActor(Class<?> requestClass, Object responseMsg) {
+ this.requestClass = requestClass;
this.responseMsg = responseMsg;
}
- public void updateResponse(Object response) {
- responseMsg = response;
- }
-
@Override
public void onReceive(Object message) throws Exception {
- if(!"get-all-messages".equals(message)) {
- LOG.debug("Received message : {}", message);
- }
- super.onReceive(message);
- if (message instanceof AddServer && responseMsg != null) {
- getSender().tell(responseMsg, getSelf());
- } else if(message instanceof RemoveServer && responseMsg != null){
- getSender().tell(responseMsg, getSelf());
- } else if(message.equals(CLEAR_RESPONSE)) {
+ if(message.equals(CLEAR_RESPONSE)) {
responseMsg = null;
+ } else {
+ super.onReceive(message);
+ if (message.getClass().equals(requestClass) && responseMsg != null) {
+ getSender().tell(responseMsg, getSelf());
+ }
}
}
}