From 3a72e9cb6f6b0a17dcf08a3e87772c22a191d580 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 29 Jul 2016 14:33:27 -0400 Subject: [PATCH] Enable akka WeaklyUp feature By enabling allow-weakly-up-members, akka will allow new nodes to join a cluster if there are unreachable nodes. However, this only pertains to new nodes that weren't previously in the cluster. Unfortunately it doesn't pertain to node restarts where a node was in the cluster then attempts to re-join with a new incarnation, which is what we really want. Despite that, it will at least work for new nodes so I think it's worth enabling. Akka might be further enhanced to broaden WeaklyUp to include new incarnations (there's requests for that). I also changed the ShardManager to handle MemberWeaklyUp events in the same manner as MemberUp. Change-Id: I5cf6c1967162b8a9bc6ffb59d34a50560699e4ca Signed-off-by: Tom Pantelis --- .../main/resources/initial/factory-akka.conf | 4 ++- .../datastore/shardmanager/ShardManager.java | 27 ++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf index b3c8bf645e..10660fdb5a 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf @@ -74,7 +74,7 @@ odl-cluster-data { seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"] seed-node-timeout = 12s - + # Following is an excerpt from Akka Cluster Documentation # link - http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html # Warning - Akka recommends against using the auto-down feature of Akka Cluster in production. @@ -83,6 +83,8 @@ odl-cluster-data { #auto-down-unreachable-after = 30s + allow-weakly-up-members = on + roles = [ "member-1" ] diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 4156ab3426..c1bd716948 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -18,6 +18,7 @@ import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.Member; import akka.dispatch.Futures; import akka.dispatch.OnComplete; @@ -195,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberWeaklyUp){ + memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); } else if (message instanceof ClusterEvent.MemberExited){ memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { @@ -733,7 +736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberRemoved(ClusterEvent.MemberRemoved message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -746,7 +749,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberExited(ClusterEvent.MemberExited message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -759,14 +762,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberUp(ClusterEvent.MemberUp message) { MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - addPeerAddress(memberName, message.member().address()); + memberUp(memberName, message.member().address()); + } + private void memberUp(MemberName memberName, Address address) { + addPeerAddress(memberName, address); checkReady(); } + private void memberWeaklyUp(MemberWeaklyUp message) { + MemberName memberName = memberToName(message.member()); + + LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberUp(memberName, message.member().address()); + } + private void addPeerAddress(MemberName memberName, Address address) { peerAddressResolver.addPeerAddress(memberName, address); @@ -781,7 +796,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberReachable(ClusterEvent.ReachableMember message) { MemberName memberName = memberToName(message.member()); - LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); addPeerAddress(memberName, message.member().address()); @@ -790,7 +805,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void memberUnreachable(ClusterEvent.UnreachableMember message) { MemberName memberName = memberToName(message.member()); - LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); markMemberUnavailable(memberName); } -- 2.36.6