Fix modernization issues
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 015c0a1b0e46d0bf180a78cfe781253997930c20..f43a1d9f9613e3871c6190948c12fb7f7172e1e0 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
@@ -18,8 +21,7 @@ import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,7 +32,7 @@ import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -60,7 +62,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     };
 
     private final boolean autoStartGossipTicks;
-    private final RemoteRpcProviderConfig config;
+    private final RemoteOpsProviderConfig config;
 
     /**
      * All known cluster members.
@@ -83,20 +85,20 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     private BucketStoreAccess bucketStore;
 
-    Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
-        this.config = Preconditions.checkNotNull(config);
+    Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) {
+        this.config = requireNonNull(config);
         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
     }
 
-    Gossiper(final RemoteRpcProviderConfig config) {
+    Gossiper(final RemoteOpsProviderConfig config) {
         this(config, Boolean.TRUE);
     }
 
-    public static Props props(final RemoteRpcProviderConfig config) {
+    public static Props props(final RemoteOpsProviderConfig config) {
         return Props.create(Gossiper.class, config);
     }
 
-    static Props testProps(final RemoteRpcProviderConfig config) {
+    static Props testProps(final RemoteOpsProviderConfig config) {
         return Props.create(Gossiper.class, config, Boolean.FALSE);
     }
 
@@ -105,9 +107,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+        bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
 
-        if (provider instanceof ClusterActorRefProvider ) {
+        if (provider instanceof ClusterActorRefProvider) {
             cluster = Cluster.get(getContext().system());
             cluster.subscribe(getSelf(),
                     ClusterEvent.initialStateAsEvents(),
@@ -139,7 +141,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     }
 
     @Override
-    protected void handleReceive(final Object message) throws Exception {
+    protected void handleReceive(final Object message) {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
         if (GOSSIP_TICK.equals(message)) {
@@ -175,6 +177,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param member who went down
      */
     private void receiveMemberRemoveOrUnreachable(final Member member) {
+        LOG.debug("Received memberDown or Unreachable: {}", member);
+
         //if its self, then stop itself
         if (selfAddress.equals(member.address())) {
             getContext().stop(getSelf());
@@ -205,6 +209,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param member the member to add
      */
     private void receiveMemberUpOrReachable(final Member member) {
+        LOG.debug("Received memberUp or reachable: {}", member);
+
         //ignore up notification for self
         if (selfAddress.equals(member.address())) {
             return;
@@ -238,7 +244,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
 
         LOG.trace("Gossiping to [{}]", address);
-        getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
+        getLocalStatusAndSendTo(verifyNotNull(peers.get(address)));
     }
 
     /**
@@ -329,7 +335,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
-        bucketStore.updateRemoteBuckets(buckets);
+        // filter this so we only handle buckets for known peers
+        bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
     }
 
     /**