import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+ memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
} else if (message instanceof ClusterEvent.MemberExited){
memberExited((ClusterEvent.MemberExited) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
- shardReplicaOperationsInProgress.remove(shardId);
+ shardReplicaOperationsInProgress.remove(shardId.getShardName());
LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
private void memberRemoved(ClusterEvent.MemberRemoved message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
private void memberExited(ClusterEvent.MemberExited message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
private void memberUp(ClusterEvent.MemberUp message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
- addPeerAddress(memberName, message.member().address());
+ memberUp(memberName, message.member().address());
+ }
+ private void memberUp(MemberName memberName, Address address) {
+ addPeerAddress(memberName, address);
checkReady();
}
+ private void memberWeaklyUp(MemberWeaklyUp message) {
+ MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ memberUp(memberName, message.member().address());
+ }
+
private void addPeerAddress(MemberName memberName, Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
private void memberReachable(ClusterEvent.ReachableMember message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+ LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
private void memberUnreachable(ClusterEvent.UnreachableMember message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+ LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
}
findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new RunnableMessage() {
- @Override
- public void run() {
- addShard(getShardName(), response, getSender());
- }
- }, getTargetActor());
+ getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
}
@Override
}
private void doRemoveShardReplicaAsync(final String primaryPath) {
- getSelf().tell(new RunnableMessage() {
- @Override
- public void run() {
- removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
- }
- }, getTargetActor());
+ getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
}
});
}
String.format("Failed to find local shard %s", shardName), failure)), self());
} else {
if(response instanceof LocalShardFound) {
- getSelf().tell(new RunnableMessage() {
- @Override
- public void run() {
- onLocalShardFound.accept((LocalShardFound) response);
- }
- }, sender);
+ getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
} else if(response instanceof LocalShardNotFound) {
String msg = String.format("Local shard %s does not exist", shardName);
LOG.debug ("{}: {}", persistenceId, msg);