BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 0b64136c49f9bda7bb331c9f5111d2146b91da9d..261da12ae13652889c280b6ef5deddb6d8b25944 100644 (file)
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
+import akka.actor.Props;
 import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
 import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Gossiper that syncs bucket store across nodes in the cluster.
- * <p>
- * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
- * to a randomly selected remote gossiper.
- * <p>
- * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
- * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
- * gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p>
- * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
+ * <p/>
+ * It keeps a local scheduler that periodically sends Gossip ticks to
+ * itself to send bucket store's bucket versions to a randomly selected remote
+ * gossiper.
+ * <p/>
+ * When bucket versions are received from a remote gossiper, it is compared
+ * with bucket store's bucket versions. Which ever buckets are newer
+ * locally, are sent to remote gossiper. If any bucket is older in bucket store,
+ * a gossip status is sent to remote gossiper so that it can send the newer buckets.
+ * <p/>
+ * When a bucket is received from a remote gossiper, its sent to the bucket store
+ * for update.
  *
  */
 
-public class Gossiper extends UntypedActor {
-
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
-    Cluster cluster = Cluster.get(getContext().system());
+public class Gossiper extends AbstractUntypedActorWithMetering {
+    private final boolean autoStartGossipTicks;
+    private final RemoteRpcProviderConfig config;
 
     /**
-     * ActorSystem's address for the current cluster node.
+     * All known cluster members.
      */
-    private Address selfAddress = cluster.selfAddress();
+    private final List<Address> clusterMembers = new ArrayList<>();
 
     /**
-     * All known cluster members
+     * ActorSystem's address for the current cluster node.
      */
-    private List<Address> clusterMembers = new ArrayList<>();
+    private Address selfAddress;
+
+    private Cluster cluster;
 
     private Cancellable gossipTask;
 
-    private Boolean autoStartGossipTicks = true;
+    Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+        this.config = Preconditions.checkNotNull(config);
+        this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+    }
 
-    public Gossiper(){}
+    Gossiper(final RemoteRpcProviderConfig config) {
+        this(config, Boolean.TRUE);
+    }
 
-    /**
-     * Helpful for testing
-     * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
-     */
-    public Gossiper(Boolean autoStartGossipTicks){
-        this.autoStartGossipTicks = autoStartGossipTicks;
+    public static Props props(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config);
+    }
+
+    static Props testProps(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config, Boolean.FALSE);
     }
 
     @Override
     public void preStart(){
-
-        cluster.subscribe(getSelf(),
-                          ClusterEvent.initialStateAsEvents(),
-                          ClusterEvent.MemberEvent.class,
-                          ClusterEvent.UnreachableMember.class);
+        ActorRefProvider provider = getContext().provider();
+        selfAddress = provider.getDefaultAddress();
+
+        if (provider instanceof ClusterActorRefProvider ) {
+            cluster = Cluster.get(getContext().system());
+            cluster.subscribe(getSelf(),
+                    ClusterEvent.initialStateAsEvents(),
+                    ClusterEvent.MemberEvent.class,
+                    ClusterEvent.ReachableMember.class,
+                    ClusterEvent.UnreachableMember.class);
+        }
 
         if (autoStartGossipTicks) {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
-                    new FiniteDuration(500, TimeUnit.MILLISECONDS),         //interval
-                    getSelf(),                                       //target
-                    new Messages.GossiperMessages.GossipTick(),      //message
-                    getContext().dispatcher(),                       //execution context
-                    getSelf()                                        //sender
+                    config.getGossipTickInterval(),                 //interval
+                    getSelf(),                                      //target
+                    new Messages.GossiperMessages.GossipTick(),     //message
+                    getContext().dispatcher(),                      //execution context
+                    getSelf()                                       //sender
             );
         }
     }
 
     @Override
     public void postStop(){
-        if (cluster != null)
+        if (cluster != null) {
             cluster.unsubscribe(getSelf());
-        if (gossipTask != null)
+        }
+        if (gossipTask != null) {
             gossipTask.cancel();
+        }
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+    protected void handleReceive(final Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
-        if (message instanceof GossipTick)
+        if (message instanceof GossipTick) {
             receiveGossipTick();
-
-        //Message from remote gossiper with its bucket versions
-        else if (message instanceof GossipStatus)
+        } else if (message instanceof GossipStatus) {
+            // Message from remote gossiper with its bucket versions
             receiveGossipStatus((GossipStatus) message);
-
-        //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
-        //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
-        //message with its local versions
-        else if (message instanceof GossipEnvelope)
+        } else if (message instanceof GossipEnvelope) {
+            // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+            // message. The contained buckets are newer as determined by the remote gossiper by
+            // comparing the GossipStatus message with its local versions.
             receiveGossip((GossipEnvelope) message);
+        } else if (message instanceof ClusterEvent.MemberUp) {
+            receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
 
-        else if (message instanceof ClusterEvent.MemberUp) {
-            receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+        } else if (message instanceof ClusterEvent.ReachableMember) {
+            receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
 
         } else if (message instanceof ClusterEvent.MemberRemoved) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
 
-        } else if ( message instanceof ClusterEvent.UnreachableMember){
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
 
-        } else
+        } else {
             unhandled(message);
+        }
     }
 
     /**
@@ -149,7 +167,7 @@ public class Gossiper extends UntypedActor {
      *
      * @param member who went down
      */
-    void receiveMemberRemoveOrUnreachable(Member member) {
+    private void receiveMemberRemoveOrUnreachable(final Member member) {
         //if its self, then stop itself
         if (selfAddress.equals(member.address())){
             getContext().stop(getSelf());
@@ -157,22 +175,26 @@ public class Gossiper extends UntypedActor {
         }
 
         clusterMembers.remove(member.address());
-        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+
+        getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
     }
 
     /**
      * Add member to the local copy of member list if it doesnt already
      * @param member
      */
-    void receiveMemberUp(Member member) {
-
-        if (selfAddress.equals(member.address()))
-            return; //ignore up notification for self
+    private void receiveMemberUpOrReachable(final Member member) {
+        //ignore up notification for self
+        if (selfAddress.equals(member.address())) {
+            return;
+        }
 
-        if (!clusterMembers.contains(member.address()))
+        if (!clusterMembers.contains(member.address())) {
             clusterMembers.add(member.address());
+        }
 
-        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
     /**
@@ -181,19 +203,23 @@ public class Gossiper extends UntypedActor {
      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
      */
-    void receiveGossipTick(){
-        if (clusterMembers.size() == 0) return; //no members to send gossip status to
-
-        Address remoteMemberToGossipTo = null;
-
-        if (clusterMembers.size() == 1)
-            remoteMemberToGossipTo = clusterMembers.get(0);
-        else {
-            Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
-            remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+    @VisibleForTesting
+    void receiveGossipTick() {
+        final Address remoteMemberToGossipTo;
+        switch (clusterMembers.size()) {
+            case 0:
+                //no members to send gossip status to
+                return;
+            case 1:
+                remoteMemberToGossipTo = clusterMembers.get(0);
+                break;
+            default:
+                final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+                remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+                break;
         }
 
-        log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
         getLocalStatusAndSendTo(remoteMemberToGossipTo);
     }
 
@@ -210,17 +236,18 @@ public class Gossiper extends UntypedActor {
      *
      * @param status bucket versions from a remote member
      */
-    void receiveGossipStatus(GossipStatus status){
-        //Dont want to accept messages from non-members
-        if (!clusterMembers.contains(status.from()))
+    @VisibleForTesting
+    void receiveGossipStatus(final GossipStatus status) {
+        //Don't accept messages from non-members
+        if (!clusterMembers.contains(status.from())) {
             return;
+        }
 
         final ActorRef sender = getSender();
-
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
     }
 
     /**
@@ -228,17 +255,15 @@ public class Gossiper extends UntypedActor {
      *
      * @param envelope contains buckets from a remote gossiper
      */
-    void receiveGossip(GossipEnvelope envelope){
+    @VisibleForTesting
+    <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             return;
         }
-        if (envelope.getBuckets() == null)
-            return; //nothing to do
 
         updateRemoteBuckets(envelope.getBuckets());
-
     }
 
     /**
@@ -246,14 +271,9 @@ public class Gossiper extends UntypedActor {
      *
      * @param buckets
      */
-    void updateRemoteBuckets(Map<Address, Bucket> buckets) {
-
-        if (buckets == null || buckets.isEmpty())
-            return; //nothing to merge
-
-        UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
-
-        getContext().parent().tell(updateRemoteBuckets, getSelf());
+    @VisibleForTesting
+    <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+        getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
     }
 
     /**
@@ -264,10 +284,9 @@ public class Gossiper extends UntypedActor {
      */
     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
 
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
-
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
-
     }
 
     /**
@@ -275,18 +294,20 @@ public class Gossiper extends UntypedActor {
      *
      * @param remoteActorSystemAddress remote gossiper to send to
      */
-    void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
+    @VisibleForTesting
+    void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
 
         //Get local status from bucket store and send to remote
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
+        //Find gossiper on remote system
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
-        log.debug("Sending bucket versions to [{}]", remoteRef);
+        LOG.trace("Sending bucket versions to [{}]", remoteRef);
 
         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-
     }
 
     /**
@@ -294,13 +315,13 @@ public class Gossiper extends UntypedActor {
      * @param remote        remote gossiper to send versions to
      * @param localVersions bucket versions received from local store
      */
-    void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
+    void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
 
         GossipStatus status = new GossipStatus(selfAddress, localVersions);
         remote.tell(status, getSelf());
     }
 
-    void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
+    void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
 
         GossipStatus status = new GossipStatus(selfAddress, localVersions);
         remote.tell(status, getSelf());
@@ -314,7 +335,7 @@ public class Gossiper extends UntypedActor {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -328,14 +349,16 @@ public class Gossiper extends UntypedActor {
     }
 
     /**
-     * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
+     * Process bucket versions received from
+     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
      * Then this method compares remote bucket versions with local bucket versions.
      * <ul>
      *     <li>The buckets that are newer locally, send
-     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
+     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+     *     to remote
      *     <li>The buckets that are older locally, send
-     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
-     *     remote sends GossipEnvelop.
+     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+     *     to remote so that remote sends GossipEnvelop.
      * </ul>
      *
      * @param sender the remote member
@@ -349,7 +372,7 @@ public class Gossiper extends UntypedActor {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -365,24 +388,29 @@ public class Gossiper extends UntypedActor {
                     localIsNewer.removeAll(remoteVersions.keySet());
 
 
-                    for (Address address : remoteVersions.keySet()){
-
-                        if (localVersions.get(address) == null || remoteVersions.get(address) == null)
-                            continue; //this condition is taken care of by above diffs
-                        if (localVersions.get(address) <  remoteVersions.get(address))
+                    for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
+                        Address address = entry.getKey();
+                        Long remoteVersion = entry.getValue();
+                        Long localVersion = localVersions.get(address);
+                        if (localVersion == null || remoteVersion == null) {
+                            //this condition is taken care of by above diffs
+                            continue;
+                        }
+                        if (localVersions.get(address) <  remoteVersions.get(address)) {
                             localIsOlder.add(address);
-                        else if (localVersions.get(address) > remoteVersions.get(address))
+                        } else if (localVersions.get(address) > remoteVersions.get(address)) {
                             localIsNewer.add(address);
-                        else
-                            continue;
+                        }
                     }
 
-                    if (!localIsOlder.isEmpty())
-                        sendGossipStatusTo(sender, localVersions );
-
-                    if (!localIsNewer.isEmpty())
-                        sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+                    if (!localIsOlder.isEmpty()) {
+                        sendGossipStatusTo(sender, localVersions);
+                    }
 
+                    if (!localIsNewer.isEmpty()) {
+                        //send newer buckets to remote
+                        sendGossipTo(sender, localIsNewer);
+                    }
                 }
                 return null;
             }
@@ -390,9 +418,10 @@ public class Gossiper extends UntypedActor {
     }
 
     /**
-     * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
-     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
-     * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+     * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
+     * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
+     * These buckets are sent to a remote member encapsulated in
+     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
      *
      * @param sender the remote member that sent
      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
@@ -404,10 +433,10 @@ public class Gossiper extends UntypedActor {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object msg) {
+            public Void apply(final Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
-                    Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    log.info("Buckets to send from {}: {}", selfAddress, buckets);
+                    Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
+                    LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());
                 }
@@ -419,19 +448,10 @@ public class Gossiper extends UntypedActor {
     ///
     ///Getter Setters
     ///
-    List<Address> getClusterMembers() {
-        return clusterMembers;
-    }
-
-    void setClusterMembers(List<Address> clusterMembers) {
-        this.clusterMembers = clusterMembers;
-    }
-
-    Address getSelfAddress() {
-        return selfAddress;
-    }
 
-    void setSelfAddress(Address selfAddress) {
-        this.selfAddress = selfAddress;
+    @VisibleForTesting
+    void setClusterMembers(final Address... members) {
+        clusterMembers.clear();
+        clusterMembers.addAll(Arrays.asList(members));
     }
 }