BUG-7573: add BucketStore source monitoring
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 9230591d46b4d80e244d3d81c653dfebc559a4cf..2c47c4e2a9242a7d9c56a8dc30ade963b83450b7 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
+import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
@@ -20,7 +21,9 @@ import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,67 +36,70 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Gossiper that syncs bucket store across nodes in the cluster.
- * <p/>
+ *
+ * <p>
  * It keeps a local scheduler that periodically sends Gossip ticks to
  * itself to send bucket store's bucket versions to a randomly selected remote
  * gossiper.
- * <p/>
+ *
+ * <p>
  * When bucket versions are received from a remote gossiper, it is compared
  * with bucket store's bucket versions. Which ever buckets are newer
  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p/>
+ *
+ * <p>
  * When a bucket is received from a remote gossiper, its sent to the bucket store
  * for update.
- *
  */
 public class Gossiper extends AbstractUntypedActorWithMetering {
+    private final boolean autoStartGossipTicks;
+    private final RemoteRpcProviderConfig config;
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    /**
+     * All known cluster members.
+     */
+    private final List<Address> clusterMembers = new ArrayList<>();
 
-    private Cluster cluster;
+    /**
+     * Cached ActorSelections for remote peers.
+     */
+    private final Map<Address, ActorSelection> peers = new HashMap<>();
 
     /**
      * ActorSystem's address for the current cluster node.
      */
     private Address selfAddress;
 
-    /**
-     * All known cluster members.
-     */
-    private List<Address> clusterMembers = new ArrayList<>();
+    private Cluster cluster;
 
     private Cancellable gossipTask;
 
-    private Boolean autoStartGossipTicks = true;
+    Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+        this.config = Preconditions.checkNotNull(config);
+        this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+    }
 
-    private final RemoteRpcProviderConfig config;
+    Gossiper(final RemoteRpcProviderConfig config) {
+        this(config, Boolean.TRUE);
+    }
 
-    public Gossiper(RemoteRpcProviderConfig config) {
-        this.config = Preconditions.checkNotNull(config);
+    public static Props props(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config);
     }
 
-    /**
-     * Constructor for testing.
-     *
-     * @param autoStartGossipTicks used for turning off gossip ticks during testing.
-     *                             Gossip tick can be manually sent.
-     */
-    @VisibleForTesting
-    public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
-        this(config);
-        this.autoStartGossipTicks = autoStartGossipTicks;
+    static Props testProps(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config, Boolean.FALSE);
     }
 
     @Override
@@ -101,11 +107,12 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider ) {
+        if (provider instanceof ClusterActorRefProvider ) {
             cluster = Cluster.get(getContext().system());
             cluster.subscribe(getSelf(),
                     ClusterEvent.initialStateAsEvents(),
                     ClusterEvent.MemberEvent.class,
+                    ClusterEvent.ReachableMember.class,
                     ClusterEvent.UnreachableMember.class);
         }
 
@@ -113,10 +120,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
                     config.getGossipTickInterval(),                 //interval
-                    getSelf(),                                       //target
-                    new Messages.GossiperMessages.GossipTick(),      //message
-                    getContext().dispatcher(),                       //execution context
-                    getSelf()                                        //sender
+                    getSelf(),                                      //target
+                    new Messages.GossiperMessages.GossipTick(),     //message
+                    getContext().dispatcher(),                      //execution context
+                    getSelf()                                       //sender
             );
         }
     }
@@ -133,7 +140,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
         if (message instanceof GossipTick) {
@@ -147,12 +154,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             // comparing the GossipStatus message with its local versions.
             receiveGossip((GossipEnvelope) message);
         } else if (message instanceof ClusterEvent.MemberUp) {
-            receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+            receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
+
+        } else if (message instanceof ClusterEvent.ReachableMember) {
+            receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
 
         } else if (message instanceof ClusterEvent.MemberRemoved) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
 
-        } else if ( message instanceof ClusterEvent.UnreachableMember) {
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
 
         } else {
@@ -165,15 +175,29 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param member who went down
      */
-    void receiveMemberRemoveOrUnreachable(Member member) {
+    private void receiveMemberRemoveOrUnreachable(final Member member) {
         //if its self, then stop itself
         if (selfAddress.equals(member.address())) {
             getContext().stop(getSelf());
             return;
         }
 
-        clusterMembers.remove(member.address());
-        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        removePeer(member.address());
+        LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+    }
+
+    private void addPeer(final Address address) {
+        if (!clusterMembers.contains(address)) {
+            clusterMembers.add(address);
+        }
+        peers.computeIfAbsent(address, input -> getContext().system()
+            .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+    }
+
+    private void removePeer(final Address address) {
+        clusterMembers.remove(address);
+        peers.remove(address);
+        getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
     }
 
     /**
@@ -181,17 +205,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param member the member to add
      */
-    void receiveMemberUp(Member member) {
-
+    private void receiveMemberUpOrReachable(final Member member) {
+        //ignore up notification for self
         if (selfAddress.equals(member.address())) {
-            return; //ignore up notification for self
-        }
-
-        if (!clusterMembers.contains(member.address())) {
-            clusterMembers.add(member.address());
+            return;
         }
 
-        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        addPeer(member.address());
+        LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
     /**
@@ -201,22 +222,24 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
      */
+    @VisibleForTesting
     void receiveGossipTick() {
-        if (clusterMembers.size() == 0) {
-            return; //no members to send gossip status to
-        }
-
-        Address remoteMemberToGossipTo;
-
-        if (clusterMembers.size() == 1) {
-            remoteMemberToGossipTo = clusterMembers.get(0);
-        } else {
-            Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
-            remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+        final Address address;
+        switch (clusterMembers.size()) {
+            case 0:
+                //no members to send gossip status to
+                return;
+            case 1:
+                address = clusterMembers.get(0);
+                break;
+            default:
+                final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+                address = clusterMembers.get(randomIndex);
+                break;
         }
 
-        log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
-        getLocalStatusAndSendTo(remoteMemberToGossipTo);
+        LOG.trace("Gossiping to [{}]", address);
+        getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
     }
 
     /**
@@ -232,9 +255,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param status bucket versions from a remote member
      */
-    void receiveGossipStatus(GossipStatus status) {
-        //Don't accept messages from non-members
-        if (!clusterMembers.contains(status.from())) {
+    @VisibleForTesting
+    void receiveGossipStatus(final GossipStatus status) {
+        // Don't accept messages from non-members
+        if (!peers.containsKey(status.from())) {
             return;
         }
 
@@ -243,7 +267,6 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
     }
 
     /**
@@ -251,18 +274,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param envelope contains buckets from a remote gossiper
      */
-    <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
+    @VisibleForTesting
+    <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            if (log.isTraceEnabled()) {
-                log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
-                        envelope.to());
-            }
+            LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             return;
         }
 
         updateRemoteBuckets(envelope.getBuckets());
-
     }
 
     /**
@@ -270,9 +290,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param buckets map of Buckets to update
      */
-    <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
-        UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
-        getContext().parent().tell(updateRemoteBuckets, getSelf());
+    @VisibleForTesting
+    <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+        getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
     }
 
     /**
@@ -293,38 +313,16 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param remoteActorSystemAddress remote gossiper to send to
      */
-    void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
+    @VisibleForTesting
+    void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
-        //Find gossiper on remote system
-        ActorSelection remoteRef = getContext().system().actorSelection(
-                remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
-
-        log.trace("Sending bucket versions to [{}]", remoteRef);
-
-        futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
+        LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
 
-    }
-
-    /**
-     * Helper to send bucket versions received from local store.
-     *
-     * @param remote        remote gossiper to send versions to
-     * @param localVersions bucket versions received from local store
-     */
-    void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
-
-        GossipStatus status = new GossipStatus(selfAddress, localVersions);
-        remote.tell(status, getSelf());
-    }
-
-    void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
-
-        GossipStatus status = new GossipStatus(selfAddress, localVersions);
-        remote.tell(status, getSelf());
+        futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
     }
 
     ///
@@ -335,13 +333,12 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
 
-                    sendGossipStatusTo(remote, localVersions);
-
+                    remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
                 }
                 return null;
             }
@@ -372,7 +369,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -388,26 +385,30 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     localIsNewer.removeAll(remoteVersions.keySet());
 
 
-                    for (Address address : remoteVersions.keySet()) {
-
-                        if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
-                            continue; //this condition is taken care of by above diffs
+                    for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
+                        Address address = entry.getKey();
+                        Long remoteVersion = entry.getValue();
+                        Long localVersion = localVersions.get(address);
+                        if (localVersion == null || remoteVersion == null) {
+                            //this condition is taken care of by above diffs
+                            continue;
                         }
-                        if (localVersions.get(address) <  remoteVersions.get(address)) {
+
+                        if (localVersion < remoteVersion) {
                             localIsOlder.add(address);
-                        } else if (localVersions.get(address) > remoteVersions.get(address)) {
+                        } else if (localVersion > remoteVersion) {
                             localIsNewer.add(address);
                         }
                     }
 
                     if (!localIsOlder.isEmpty()) {
-                        sendGossipStatusTo(sender, localVersions );
+                        sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
                     }
 
                     if (!localIsNewer.isEmpty()) {
-                        sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+                        //send newer buckets to remote
+                        sendGossipTo(sender, localIsNewer);
                     }
-
                 }
                 return null;
             }
@@ -431,10 +432,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         return new Mapper<Object, Void>() {
             @SuppressWarnings({ "rawtypes", "unchecked" })
             @Override
-            public Void apply(Object msg) {
+            public Void apply(final Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    log.trace("Buckets to send from {}: {}", selfAddress, buckets);
+                    LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());
                 }
@@ -446,19 +447,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     ///
     ///Getter Setters
     ///
-    List<Address> getClusterMembers() {
-        return clusterMembers;
-    }
-
-    void setClusterMembers(List<Address> clusterMembers) {
-        this.clusterMembers = clusterMembers;
-    }
 
-    Address getSelfAddress() {
-        return selfAddress;
-    }
+    @VisibleForTesting
+    void setClusterMembers(final Address... members) {
+        clusterMembers.clear();
+        peers.clear();
 
-    void setSelfAddress(Address selfAddress) {
-        this.selfAddress = selfAddress;
+        for (Address addr : members) {
+            addPeer(addr);
+        }
     }
 }