X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=8386e669b83f46cb102771f7c12f1b9458f5e485;hp=4156ab34265f29376ef71519a4dab2c283c1fce8;hb=4639f61a41a93d6a762af97b819d164781b0f9f8;hpb=15d2db9a4b59b1e4c6a6d227d82d2595a061c6ba diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 4156ab3426..8386e669b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -18,6 +18,7 @@ import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.Member; import akka.dispatch.Futures; import akka.dispatch.OnComplete; @@ -195,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberWeaklyUp){ + memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); } else if (message instanceof ClusterEvent.MemberExited){ memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { @@ -304,7 +307,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { - shardReplicaOperationsInProgress.remove(shardId); + shardReplicaOperationsInProgress.remove(shardId.getShardName()); LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); @@ -733,7 +736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberRemoved(ClusterEvent.MemberRemoved message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -746,7 +749,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberExited(ClusterEvent.MemberExited message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -759,14 +762,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberUp(ClusterEvent.MemberUp message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - addPeerAddress(memberName, message.member().address()); + memberUp(memberName, message.member().address()); + } + private void memberUp(MemberName memberName, Address address) { + addPeerAddress(memberName, address); checkReady(); } + private void memberWeaklyUp(MemberWeaklyUp message) { + MemberName memberName = memberToName(message.member()); + + LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberUp(memberName, message.member().address()); + } + private void addPeerAddress(MemberName memberName, Address address) { peerAddressResolver.addPeerAddress(memberName, address); @@ -781,7 +796,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberReachable(ClusterEvent.ReachableMember message) { MemberName memberName = memberToName(message.member()); - LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); addPeerAddress(memberName, message.member().address()); @@ -790,7 +805,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberUnreachable(ClusterEvent.UnreachableMember message) { MemberName memberName = memberToName(message.member()); - LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); markMemberUnavailable(memberName); } @@ -1079,12 +1094,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new RunnableMessage() { - @Override - public void run() { - addShard(getShardName(), response, getSender()); - } - }, getTargetActor()); + getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor()); } @Override @@ -1241,12 +1251,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void doRemoveShardReplicaAsync(final String primaryPath) { - getSelf().tell(new RunnableMessage() { - @Override - public void run() { - removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()); - } - }, getTargetActor()); + getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor()); } }); } @@ -1365,12 +1370,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String.format("Failed to find local shard %s", shardName), failure)), self()); } else { if(response instanceof LocalShardFound) { - getSelf().tell(new RunnableMessage() { - @Override - public void run() { - onLocalShardFound.accept((LocalShardFound) response); - } - }, sender); + getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender); } else if(response instanceof LocalShardNotFound) { String msg = String.format("Local shard %s does not exist", shardName); LOG.debug ("{}: {}", persistenceId, msg);