Merge "BUG 1595 - Clustering : NPE on startup"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 2320789d594aa56c8f7d4ff1ca4f99525b84ca95..f6ce5e55f3ee63602fc92529e92d6e93d0ff9bb3 100644 (file)
@@ -8,17 +8,20 @@
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.UntypedActor;
 import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
 import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -60,12 +63,12 @@ public class Gossiper extends UntypedActor {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-    Cluster cluster = Cluster.get(getContext().system());
+    private Cluster cluster;
 
     /**
      * ActorSystem's address for the current cluster node.
      */
-    private Address selfAddress = cluster.selfAddress();
+    private Address selfAddress;
 
     /**
      * All known cluster members
@@ -89,16 +92,21 @@ public class Gossiper extends UntypedActor {
 
     @Override
     public void preStart(){
-
-        cluster.subscribe(getSelf(),
-                          ClusterEvent.initialStateAsEvents(),
-                          ClusterEvent.MemberEvent.class,
-                          ClusterEvent.UnreachableMember.class);
+        ActorRefProvider provider = getContext().provider();
+        selfAddress = provider.getDefaultAddress();
+
+        if ( provider instanceof ClusterActorRefProvider ) {
+            cluster = Cluster.get(getContext().system());
+            cluster.subscribe(getSelf(),
+                    ClusterEvent.initialStateAsEvents(),
+                    ClusterEvent.MemberEvent.class,
+                    ClusterEvent.UnreachableMember.class);
+        }
 
         if (autoStartGossipTicks) {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
-                    new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+                    ActorUtil.GOSSIP_TICK_INTERVAL,                 //interval
                     getSelf(),                                       //target
                     new Messages.GossiperMessages.GossipTick(),      //message
                     getContext().dispatcher(),                       //execution context
@@ -220,7 +228,9 @@ public class Gossiper extends UntypedActor {
             return;
 
         final ActorRef sender = getSender();
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
 
     }
@@ -260,7 +270,8 @@ public class Gossiper extends UntypedActor {
      */
     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
 
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
     }
 
@@ -272,7 +283,10 @@ public class Gossiper extends UntypedActor {
     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
 
         //Get local status from bucket store and send to remote
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+
+        //Find gossiper on remote system
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());