BUG-7594: Rework sal-remoterpc-connector messages
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 2c47c4e..015c0a1 100644 (file)
@@ -17,8 +17,6 @@ import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
@@ -27,21 +25,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -63,6 +52,13 @@ import scala.concurrent.duration.FiniteDuration;
  * for update.
  */
 public class Gossiper extends AbstractUntypedActorWithMetering {
+    private static final Object GOSSIP_TICK = new Object() {
+        @Override
+        public String toString() {
+            return "gossip tick";
+        }
+    };
+
     private final boolean autoStartGossipTicks;
     private final RemoteRpcProviderConfig config;
 
@@ -85,6 +81,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     private Cancellable gossipTask;
 
+    private BucketStoreAccess bucketStore;
+
     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
         this.config = Preconditions.checkNotNull(config);
         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
@@ -107,6 +105,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
+        bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+
         if (provider instanceof ClusterActorRefProvider ) {
             cluster = Cluster.get(getContext().system());
             cluster.subscribe(getSelf(),
@@ -121,7 +121,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
                     config.getGossipTickInterval(),                 //interval
                     getSelf(),                                      //target
-                    new Messages.GossiperMessages.GossipTick(),     //message
+                    GOSSIP_TICK,                                    //message
                     getContext().dispatcher(),                      //execution context
                     getSelf()                                       //sender
             );
@@ -138,12 +138,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     protected void handleReceive(final Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
-        if (message instanceof GossipTick) {
+        if (GOSSIP_TICK.equals(message)) {
             receiveGossipTick();
         } else if (message instanceof GossipStatus) {
             // Message from remote gossiper with its bucket versions
@@ -197,7 +196,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     private void removePeer(final Address address) {
         clusterMembers.remove(address);
         peers.remove(address);
-        getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
+        bucketStore.removeRemoteBucket(address);
     }
 
     /**
@@ -258,15 +257,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     @VisibleForTesting
     void receiveGossipStatus(final GossipStatus status) {
         // Don't accept messages from non-members
-        if (!peers.containsKey(status.from())) {
-            return;
+        if (peers.containsKey(status.from())) {
+            // FIXME: sender should be part of GossipStatus
+            final ActorRef sender = getSender();
+            bucketStore.getBucketVersions(versions ->  processRemoteStatus(sender, status, versions));
+        }
+    }
+
+    private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+            final Map<Address, Long> localVersions) {
+        final Map<Address, Long> remoteVersions = status.versions();
+
+        //diff between remote list and local
+        final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
+        localIsOlder.removeAll(localVersions.keySet());
+
+        //diff between local list and remote
+        final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
+        localIsNewer.removeAll(remoteVersions.keySet());
+
+
+        for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
+            Address address = entry.getKey();
+            Long remoteVersion = entry.getValue();
+            Long localVersion = localVersions.get(address);
+            if (localVersion == null || remoteVersion == null) {
+                //this condition is taken care of by above diffs
+                continue;
+            }
+
+            if (localVersion < remoteVersion) {
+                localIsOlder.add(address);
+            } else if (localVersion > remoteVersion) {
+                localIsNewer.add(address);
+            }
         }
 
-        final ActorRef sender = getSender();
-        Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+        if (!localIsOlder.isEmpty()) {
+            remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
+        }
 
-        futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
+        if (!localIsNewer.isEmpty()) {
+            //send newer buckets to remote
+            bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
+                LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
+                remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
+            });
+        }
     }
 
     /**
@@ -275,14 +312,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param envelope contains buckets from a remote gossiper
      */
     @VisibleForTesting
-    <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
+    void receiveGossip(final GossipEnvelope envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             return;
         }
 
-        updateRemoteBuckets(envelope.getBuckets());
+        updateRemoteBuckets(envelope.buckets());
     }
 
     /**
@@ -291,21 +328,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param buckets map of Buckets to update
      */
     @VisibleForTesting
-    <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
-        getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
-    }
-
-    /**
-     * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
-     *
-     * @param remote     remote node to send Buckets to
-     * @param addresses  node addresses whose buckets needs to be sent
-     */
-    void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
-
-        Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
-        futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
+    void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+        bucketStore.updateRemoteBuckets(buckets);
     }
 
     /**
@@ -315,133 +339,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
-
-        //Get local status from bucket store and send to remote
-        Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
-
-        LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
-
-        futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
-    }
-
-    ///
-    /// Private factories to create mappers
-    ///
-
-    private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(final Object replyMessage) {
-                if (replyMessage instanceof GetBucketVersionsReply) {
-                    GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
-                    Map<Address, Long> localVersions = reply.getVersions();
-
-                    remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Process bucket versions received from
-     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
-     * Then this method compares remote bucket versions with local bucket versions.
-     * <ul>
-     *     <li>The buckets that are newer locally, send
-     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
-     *     to remote
-     *     <li>The buckets that are older locally, send
-     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
-     *     to remote so that remote sends GossipEnvelop.
-     * </ul>
-     *
-     * @param sender the remote member
-     * @param status bucket versions from a remote member
-     * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
-     *
-     */
-    private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
-
-        final Map<Address, Long> remoteVersions = status.getVersions();
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(final Object replyMessage) {
-                if (replyMessage instanceof GetBucketVersionsReply) {
-                    GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
-                    Map<Address, Long> localVersions = reply.getVersions();
-
-                    //diff between remote list and local
-                    Set<Address> localIsOlder = new HashSet<>();
-                    localIsOlder.addAll(remoteVersions.keySet());
-                    localIsOlder.removeAll(localVersions.keySet());
-
-                    //diff between local list and remote
-                    Set<Address> localIsNewer = new HashSet<>();
-                    localIsNewer.addAll(localVersions.keySet());
-                    localIsNewer.removeAll(remoteVersions.keySet());
-
-
-                    for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
-                        Address address = entry.getKey();
-                        Long remoteVersion = entry.getValue();
-                        Long localVersion = localVersions.get(address);
-                        if (localVersion == null || remoteVersion == null) {
-                            //this condition is taken care of by above diffs
-                            continue;
-                        }
-
-                        if (localVersion < remoteVersion) {
-                            localIsOlder.add(address);
-                        } else if (localVersion > remoteVersion) {
-                            localIsNewer.add(address);
-                        }
-                    }
-
-                    if (!localIsOlder.isEmpty()) {
-                        sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
-                    }
-
-                    if (!localIsNewer.isEmpty()) {
-                        //send newer buckets to remote
-                        sendGossipTo(sender, localIsNewer);
-                    }
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
-     * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
-     * These buckets are sent to a remote member encapsulated in
-     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
-     *
-     * @param sender the remote member that sent
-     *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
-     *           in reply to which bucket is being sent back
-     * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
-     *
-     */
-    private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
-        return new Mapper<Object, Void>() {
-            @SuppressWarnings({ "rawtypes", "unchecked" })
-            @Override
-            public Void apply(final Object msg) {
-                if (msg instanceof GetBucketsByMembersReply) {
-                    Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
-                    GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
-                    sender.tell(envelope, getSelf());
-                }
-                return null;
-            }
-        };
+        bucketStore.getBucketVersions(versions -> {
+            LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
+            /*
+             * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
+             *      but can we identify which bucket is the local one?
+             */
+            remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
+        });
     }
 
     ///