Enable akka WeaklyUp feature 99/42799/3
authorTom Pantelis <tpanteli@brocade.com>
Fri, 29 Jul 2016 18:33:27 +0000 (14:33 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 5 Aug 2016 02:45:50 +0000 (02:45 +0000)
By enabling allow-weakly-up-members, akka will allow new nodes to join a
cluster if there are unreachable nodes. However, this only pertains to
new nodes that weren't previously in the cluster. Unfortunately it
doesn't pertain to node restarts where a node was in the cluster then
attempts to re-join with a new incarnation, which is what we really want.
Despite that, it will at least work for new nodes so I think it's worth
enabling. Akka might be further enhanced to broaden WeaklyUp to include
new incarnations (there's requests for that).

I also changed the ShardManager to handle MemberWeaklyUp events in
the same manner as MemberUp.

Change-Id: I5cf6c1967162b8a9bc6ffb59d34a50560699e4ca
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java

index b3c8bf645e16f0d2000b71afe917581822c760c4..10660fdb5a6bc37eef64e8424f0fbaaf785beb9e 100644 (file)
@@ -74,7 +74,7 @@ odl-cluster-data {
       seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
       seed-node-timeout = 12s
-      
       # Following is an excerpt from Akka Cluster Documentation
       # link - http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html
       # Warning - Akka recommends against using the auto-down feature of Akka Cluster in production.
@@ -83,6 +83,8 @@ odl-cluster-data {
 
       #auto-down-unreachable-after = 30s
 
+      allow-weakly-up-members = on
+
       roles = [
         "member-1"
       ]
index 4156ab34265f29376ef71519a4dab2c283c1fce8..c1bd716948296a7750444b458c617acb94c4afb8 100644 (file)
@@ -18,6 +18,7 @@ import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
@@ -195,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
         } else if (message instanceof ClusterEvent.MemberExited){
             memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -733,7 +736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -746,7 +749,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberExited(ClusterEvent.MemberExited message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -759,14 +762,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberUp(ClusterEvent.MemberUp message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        addPeerAddress(memberName, message.member().address());
+        memberUp(memberName, message.member().address());
+    }
 
+    private void memberUp(MemberName memberName, Address address) {
+        addPeerAddress(memberName, address);
         checkReady();
     }
 
+    private void memberWeaklyUp(MemberWeaklyUp message) {
+        MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberUp(memberName, message.member().address());
+    }
+
     private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
@@ -781,7 +796,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         MemberName memberName = memberToName(message.member());
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+        LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
 
@@ -790,7 +805,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
         MemberName memberName = memberToName(message.member());
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+        LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }