import akka.actor.Status.Success;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
public class ShardManagerTest extends AbstractActorTest {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
+ private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+ private static final MemberName MEMBER_3 = MemberName.forName("member-3");
private static int ID_COUNTER = 1;
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config");
+ mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
mockShardName.toString());
}
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if(system == getSystem()) {
return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
}
private Props newPropsShardMgrWithMockShardActor() {
- return newTestShardMgrBuilderWithMockShardActor().props();
+ return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
+ Dispatchers.DefaultDispatcherId());
}
private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
final MockConfiguration mockConfig = new MockConfiguration() {
@Override
- public Collection<String> getMemberShardNames(String memberName) {
+ public Collection<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "topology");
}
@Override
- public Collection<String> getMembersFromShardName(String shardName) {
- return Arrays.asList("member-1");
+ public Collection<MemberName> getMembersFromShardName(String shardName) {
+ return members("member-1");
}
};
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
shardManager1.underlyingActor().waitForMemberUp();
-
shardManager1.tell(new FindPrimary("astronauts", false), getRef());
RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
shardManager1.underlyingActor().waitForUnreachableMember();
PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
- assertEquals("getMemberName", "member-2", peerDown.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(MockClusterWrapper.
shardManager1.underlyingActor().waitForReachableMember();
PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
- assertEquals("getMemberName", "member-2", peerUp.getMemberName());
+ assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(new FindPrimary("default", true), getRef());
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
- public List<String> getMemberShardNames(String memberName) {
+ public List<String> getMemberShardNames(MemberName memberName) {
return Arrays.asList("default", "astronauts");
}
}));
}};
}
+ private static List<MemberName> members(String... names) {
+ return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
+ }
+
@Test
public void testOnCreateShard() {
LOG.info("testOnCreateShard starting");
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
+ "foo", null, members("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
getPeerAddressResolver() instanceof ShardPeerAddressResolver);
- assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
- new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
- shardBuilder.getPeerAddresses().keySet());
- assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
+ assertEquals("peerMembers", Sets.newHashSet(
+ ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
+ ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
+ shardBuilder.getPeerAddresses().keySet());
+ assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-5", "member-6"));
+ "foo", null, members("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
Shard.Builder shardBuilder = Shard.builder();
ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
- "foo", null, Arrays.asList("member-1"));
+ "foo", null, members("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
expectMsgClass(duration("5 seconds"), Success.class);
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
- @Override
- public String apply(ShardSnapshot s) {
- return s.getName();
- }
- };
+ Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class).
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
+ new AddServerReply(ServerChangeStatus.OK, memberId2)).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
mock(DataTree.class), leaderVersion), mockShardLeaderActor);
InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
//construct a mock response message
- AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
AddServer.class);
String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
- Props.create(MockRespondActor.class, addServerReply), leaderId);
+ Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
- shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef());
Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
assertEquals("Failure obtained", true,
(resp.cause() instanceof PrimaryNotFoundException));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> respondActor =
- TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), respondActor);
- respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
- shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
+ shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
- assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
+ assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Success.class);
}};
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
+ String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.underlyingActor().waitForMemberUp();
//construct a mock response message
- RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
- newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
+ newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
- String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
+ String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Status.Success.class);
}};
@Test
public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
- testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
- RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
+ testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
+ RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
}
@Test
public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
- AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
+ AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
}
// Removed the default shard replica from member-1
ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
- ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
+ ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix).build();
shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
- type(shardMrgIDSuffix).build().toString();
+ String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
MessageCollectorActor.props(), shardId);
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
- type(shardMrgIDSuffix).build().toString();
+ String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
- String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
- type(shardMrgIDSuffix).build().toString();
+ String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
LOG.info("testShutDown ending");
}
+ @Test
+ public void testChangeServersVotingStatus() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
+ respondActor, ChangeServersVotingStatus.class);
+ assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+ ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
+ shardMrgIDSuffix).toString(), Boolean.TRUE));
+
+ expectMsgClass(duration("5 seconds"), Success.class);
+ }};
+ }
+
+ @Test
+ public void testChangeServersVotingStatusWithNoLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+ Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+ assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
+ }};
+ }
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
}
}
+ private void countDownIfOther(final Member member, CountDownLatch latch) {
+ if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
+ latch.countDown();
+ }
+ }
+
@Override
public void handleCommand(Object message) throws Exception {
try{
if(message instanceof FindPrimary) {
findPrimaryMessageReceived.countDown();
} else if(message instanceof ClusterEvent.MemberUp) {
- String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUpReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberUp)message).member(), memberUpReceived);
} else if(message instanceof ClusterEvent.MemberRemoved) {
- String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberRemovedReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.MemberRemoved)message).member(), memberRemovedReceived);
} else if(message instanceof ClusterEvent.UnreachableMember) {
- String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberUnreachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.UnreachableMember)message).member(), memberUnreachableReceived);
} else if(message instanceof ClusterEvent.ReachableMember) {
- String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
- if(!getCluster().getCurrentMemberName().equals(role)) {
- memberReachableReceived.countDown();
- }
+ countDownIfOther(((ClusterEvent.ReachableMember)message).member(), memberReachableReceived);
}
}
}
private static class MockRespondActor extends MessageCollectorActor {
static final String CLEAR_RESPONSE = "clear-response";
- static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
- private volatile Object responseMsg;
+ private Object responseMsg;
+ private final Class<?> requestClass;
@SuppressWarnings("unused")
- public MockRespondActor() {
- }
-
- @SuppressWarnings("unused")
- public MockRespondActor(Object responseMsg) {
+ public MockRespondActor(Class<?> requestClass, Object responseMsg) {
+ this.requestClass = requestClass;
this.responseMsg = responseMsg;
}
- public void updateResponse(Object response) {
- responseMsg = response;
- }
-
@Override
public void onReceive(Object message) throws Exception {
- if(!"get-all-messages".equals(message)) {
- LOG.debug("Received message : {}", message);
- }
- super.onReceive(message);
- if (message instanceof AddServer && responseMsg != null) {
- getSender().tell(responseMsg, getSelf());
- } else if(message instanceof RemoveServer && responseMsg != null){
- getSender().tell(responseMsg, getSelf());
- } else if(message.equals(CLEAR_RESPONSE)) {
+ if(message.equals(CLEAR_RESPONSE)) {
responseMsg = null;
+ } else {
+ super.onReceive(message);
+ if (message.getClass().equals(requestClass) && responseMsg != null) {
+ getSender().tell(responseMsg, getSelf());
+ }
}
}
}