BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 04c767e02142ce315cd5b10b1f01a8f24ed7da9f..db5004145260ffb7c5bc88e85837332e2cd51bc3 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
+import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
@@ -21,6 +22,7 @@ import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,12 +35,11 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -61,41 +62,38 @@ import scala.concurrent.duration.FiniteDuration;
  * for update.
  */
 public class Gossiper extends AbstractUntypedActorWithMetering {
+    private final boolean autoStartGossipTicks;
+    private final RemoteRpcProviderConfig config;
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private Cluster cluster;
+    /**
+     * All known cluster members.
+     */
+    private final List<Address> clusterMembers = new ArrayList<>();
 
     /**
      * ActorSystem's address for the current cluster node.
      */
     private Address selfAddress;
 
-    /**
-     * All known cluster members.
-     */
-    private List<Address> clusterMembers = new ArrayList<>();
+    private Cluster cluster;
 
     private Cancellable gossipTask;
 
-    private Boolean autoStartGossipTicks = true;
+    Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+        this.config = Preconditions.checkNotNull(config);
+        this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+    }
 
-    private final RemoteRpcProviderConfig config;
+    Gossiper(final RemoteRpcProviderConfig config) {
+        this(config, Boolean.TRUE);
+    }
 
-    public Gossiper(RemoteRpcProviderConfig config) {
-        this.config = Preconditions.checkNotNull(config);
+    public static Props props(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config);
     }
 
-    /**
-     * Constructor for testing.
-     *
-     * @param autoStartGossipTicks used for turning off gossip ticks during testing.
-     *                             Gossip tick can be manually sent.
-     */
-    @VisibleForTesting
-    public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
-        this(config);
-        this.autoStartGossipTicks = autoStartGossipTicks;
+    static Props testProps(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config, Boolean.FALSE);
     }
 
     @Override
@@ -103,7 +101,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider ) {
+        if (provider instanceof ClusterActorRefProvider ) {
             cluster = Cluster.get(getContext().system());
             cluster.subscribe(getSelf(),
                     ClusterEvent.initialStateAsEvents(),
@@ -116,10 +114,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
                     config.getGossipTickInterval(),                 //interval
-                    getSelf(),                                       //target
-                    new Messages.GossiperMessages.GossipTick(),      //message
-                    getContext().dispatcher(),                       //execution context
-                    getSelf()                                        //sender
+                    getSelf(),                                      //target
+                    new Messages.GossiperMessages.GossipTick(),     //message
+                    getContext().dispatcher(),                      //execution context
+                    getSelf()                                       //sender
             );
         }
     }
@@ -136,7 +134,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
         if (message instanceof GossipTick) {
@@ -158,7 +156,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         } else if (message instanceof ClusterEvent.MemberRemoved) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
 
-        } else if ( message instanceof ClusterEvent.UnreachableMember) {
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
 
         } else {
@@ -171,7 +169,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param member who went down
      */
-    void receiveMemberRemoveOrUnreachable(Member member) {
+    private void receiveMemberRemoveOrUnreachable(final Member member) {
         //if its self, then stop itself
         if (selfAddress.equals(member.address())) {
             getContext().stop(getSelf());
@@ -179,7 +177,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
 
         clusterMembers.remove(member.address());
-        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+
+        getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
     }
 
     /**
@@ -187,10 +187,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param member the member to add
      */
-    void receiveMemberUpOrReachable(final Member member) {
-
+    private void receiveMemberUpOrReachable(final Member member) {
+        //ignore up notification for self
         if (selfAddress.equals(member.address())) {
-            //ignore up notification for self
             return;
         }
 
@@ -198,7 +197,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             clusterMembers.add(member.address());
         }
 
-        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
     /**
@@ -208,21 +207,23 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
      */
+    @VisibleForTesting
     void receiveGossipTick() {
-        if (clusterMembers.size() == 0) {
-            return; //no members to send gossip status to
-        }
-
-        Address remoteMemberToGossipTo;
-
-        if (clusterMembers.size() == 1) {
-            remoteMemberToGossipTo = clusterMembers.get(0);
-        } else {
-            Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
-            remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+        final Address remoteMemberToGossipTo;
+        switch (clusterMembers.size()) {
+            case 0:
+                //no members to send gossip status to
+                return;
+            case 1:
+                remoteMemberToGossipTo = clusterMembers.get(0);
+                break;
+            default:
+                final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+                remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+                break;
         }
 
-        log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
+        LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
         getLocalStatusAndSendTo(remoteMemberToGossipTo);
     }
 
@@ -239,7 +240,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param status bucket versions from a remote member
      */
-    void receiveGossipStatus(GossipStatus status) {
+    @VisibleForTesting
+    void receiveGossipStatus(final GossipStatus status) {
         //Don't accept messages from non-members
         if (!clusterMembers.contains(status.from())) {
             return;
@@ -250,7 +252,6 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
     }
 
     /**
@@ -258,18 +259,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param envelope contains buckets from a remote gossiper
      */
-    <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
+    @VisibleForTesting
+    <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            if (log.isTraceEnabled()) {
-                log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
-                        envelope.to());
-            }
+            LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             return;
         }
 
         updateRemoteBuckets(envelope.getBuckets());
-
     }
 
     /**
@@ -277,9 +275,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param buckets map of Buckets to update
      */
-    <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
-        UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
-        getContext().parent().tell(updateRemoteBuckets, getSelf());
+    @VisibleForTesting
+    <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+        getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
     }
 
     /**
@@ -300,7 +298,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param remoteActorSystemAddress remote gossiper to send to
      */
-    void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
+    @VisibleForTesting
+    void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
@@ -310,10 +309,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
-        log.trace("Sending bucket versions to [{}]", remoteRef);
+        LOG.trace("Sending bucket versions to [{}]", remoteRef);
 
         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-
     }
 
     /**
@@ -322,13 +320,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param remote        remote gossiper to send versions to
      * @param localVersions bucket versions received from local store
      */
-    void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
+    void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
 
         GossipStatus status = new GossipStatus(selfAddress, localVersions);
         remote.tell(status, getSelf());
     }
 
-    void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
+    void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
 
         GossipStatus status = new GossipStatus(selfAddress, localVersions);
         remote.tell(status, getSelf());
@@ -342,7 +340,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -379,7 +377,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -400,7 +398,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                         Long remoteVersion = entry.getValue();
                         Long localVersion = localVersions.get(address);
                         if (localVersion == null || remoteVersion == null) {
-                            continue; //this condition is taken care of by above diffs
+                            //this condition is taken care of by above diffs
+                            continue;
                         }
 
                         if (localVersion < remoteVersion) {
@@ -411,13 +410,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     }
 
                     if (!localIsOlder.isEmpty()) {
-                        sendGossipStatusTo(sender, localVersions );
+                        sendGossipStatusTo(sender, localVersions);
                     }
 
                     if (!localIsNewer.isEmpty()) {
-                        sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+                        //send newer buckets to remote
+                        sendGossipTo(sender, localIsNewer);
                     }
-
                 }
                 return null;
             }
@@ -441,10 +440,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         return new Mapper<Object, Void>() {
             @SuppressWarnings({ "rawtypes", "unchecked" })
             @Override
-            public Void apply(Object msg) {
+            public Void apply(final Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    log.trace("Buckets to send from {}: {}", selfAddress, buckets);
+                    LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());
                 }
@@ -456,19 +455,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     ///
     ///Getter Setters
     ///
-    List<Address> getClusterMembers() {
-        return clusterMembers;
-    }
 
-    void setClusterMembers(List<Address> clusterMembers) {
-        this.clusterMembers = clusterMembers;
-    }
-
-    Address getSelfAddress() {
-        return selfAddress;
-    }
-
-    void setSelfAddress(Address selfAddress) {
-        this.selfAddress = selfAddress;
+    @VisibleForTesting
+    void setClusterMembers(final Address... members) {
+        clusterMembers.clear();
+        clusterMembers.addAll(Arrays.asList(members));
     }
 }