BUG-7594: Rework sal-remoterpc-connector messages
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStoreActor.java
@@ -8,7 +8,8 @@
 
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
@@ -27,21 +28,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.collect.HashMultimap;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.SetMultimap;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 
 /**
  * A store that syncs its data across nodes in the cluster.
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -51,9 +47,15 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
  * <p>
  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  * <p>
  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
- *
  */
  */
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
+public abstract class BucketStoreActor<T extends BucketData<T>> extends
+        AbstractUntypedPersistentActorWithMetering {
+    // Internal marker interface for messages which are just bridges to execute a method
+    @FunctionalInterface
+    private interface ExecuteInActor extends Consumer<BucketStoreActor<?>> {
+
+    }
+
     /**
      * Buckets owned by other known nodes in the cluster.
      */
     /**
      * Buckets owned by other known nodes in the cluster.
      */
@@ -86,14 +88,38 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
     private Integer incarnation;
     private boolean persisting;
 
     private Integer incarnation;
     private boolean persisting;
 
-    public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+    protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
         this.initialData = Preconditions.checkNotNull(initialData);
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
     }
 
         this.config = Preconditions.checkNotNull(config);
         this.initialData = Preconditions.checkNotNull(initialData);
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
     }
 
+    static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
+        return actor -> actor.getBucketsByMembers(members);
+    }
+
+    static ExecuteInActor removeBucketMessage(final Address addr) {
+        return actor -> actor.removeBucket(addr);
+    }
+
+    static ExecuteInActor updateRemoteBucketsMessage(final Map<Address, Bucket<?>> buckets) {
+        return actor -> actor.updateRemoteBuckets(buckets);
+    }
+
+    public final T getLocalData() {
+        return getLocalBucket().getData();
+    }
+
+    public final Map<Address, Bucket<T>> getRemoteBuckets() {
+        return remoteBuckets;
+    }
+
+    public final Map<Address, Long> getVersions() {
+        return versions;
+    }
+
     @Override
     @Override
-    public String persistenceId() {
+    public final String persistenceId() {
         return persistenceId;
     }
 
         return persistenceId;
     }
 
@@ -107,12 +133,11 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void handleCommand(final Object message) throws Exception {
     @Override
     protected void handleCommand(final Object message) throws Exception {
-        if (message instanceof GetAllBuckets) {
+        if (GET_ALL_BUCKETS == message) {
             // GetAllBuckets is used only in testing
             // GetAllBuckets is used only in testing
-            receiveGetAllBuckets();
+            getSender().tell(getAllBuckets(), self());
             return;
         }
 
             return;
         }
 
@@ -121,14 +146,11 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
             return;
         }
 
             return;
         }
 
-        if (message instanceof GetBucketsByMembers) {
-            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
-        } else if (message instanceof GetBucketVersions) {
-            receiveGetBucketVersions();
-        } else if (message instanceof UpdateRemoteBuckets) {
-            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
-        } else if (message instanceof RemoveRemoteBucket) {
-            removeBucket(((RemoveRemoteBucket) message).getAddress());
+        if (message instanceof ExecuteInActor) {
+            ((ExecuteInActor) message).accept(this);
+        } else if (GET_BUCKET_VERSIONS == message) {
+            // FIXME: do we need to send ourselves?
+            getSender().tell(ImmutableMap.copyOf(versions), getSelf());
         } else if (message instanceof Terminated) {
             actorTerminated((Terminated) message);
         } else if (message instanceof DeleteSnapshotsSuccess) {
         } else if (message instanceof Terminated) {
             actorTerminated((Terminated) message);
         } else if (message instanceof DeleteSnapshotsSuccess) {
@@ -161,7 +183,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
     }
 
     @Override
     }
 
     @Override
-    protected void handleRecover(final Object message) throws Exception {
+    protected final void handleRecover(final Object message) throws Exception {
         if (message instanceof RecoveryCompleted) {
             if (incarnation != null) {
                 incarnation = incarnation + 1;
         if (message instanceof RecoveryCompleted) {
             if (incarnation != null) {
                 incarnation = incarnation + 1;
@@ -182,25 +204,47 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
         }
     }
 
-    protected RemoteRpcProviderConfig getConfig() {
+    protected final RemoteRpcProviderConfig getConfig() {
         return config;
     }
 
         return config;
     }
 
+    protected final void updateLocalBucket(final T data) {
+        final LocalBucket<T> local = getLocalBucket();
+        final boolean bumpIncarnation = local.setData(data);
+        versions.put(selfAddress, local.getVersion());
+
+        if (bumpIncarnation) {
+            LOG.debug("Version wrapped. incrementing incarnation");
+
+            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+            incarnation = incarnation + 1;
+
+            persisting = true;
+            saveSnapshot(incarnation);
+        }
+    }
+
     /**
     /**
-     * Returns all the buckets the this node knows about, self owned + remote.
+     * Callback to subclasses invoked when a bucket is removed.
+     *
+     * @param address Remote address
+     * @param bucket Bucket removed
      */
      */
-    @VisibleForTesting
-    protected void receiveGetAllBuckets() {
-        final ActorRef sender = getSender();
-        sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
-    }
+    protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+
+    /**
+     * Callback to subclasses invoked when the set of remote buckets is updated.
+     *
+     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+     */
+    protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
 
     /**
      * Helper to collect all known buckets.
      *
      * @return self owned + remote buckets
      */
 
     /**
      * Helper to collect all known buckets.
      *
      * @return self owned + remote buckets
      */
-    Map<Address, Bucket<T>> getAllBuckets() {
+    private Map<Address, Bucket<T>> getAllBuckets() {
         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
@@ -212,24 +256,12 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         return all;
     }
 
         return all;
     }
 
-    /**
-     * Returns buckets for requested members that this node knows about.
-     *
-     * @param members requested members
-     */
-    void receiveGetBucketsByMembers(final Set<Address> members) {
-        final ActorRef sender = getSender();
-        Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
-        sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
-    }
-
     /**
      * Helper to collect buckets for requested members.
      *
      * @param members requested members
     /**
      * Helper to collect buckets for requested members.
      *
      * @param members requested members
-     * @return buckets for requested members
      */
      */
-    Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
+    private void getBucketsByMembers(final Collection<Address> members) {
         Map<Address, Bucket<T>> buckets = new HashMap<>();
 
         //first add the local bucket if asked
         Map<Address, Bucket<T>> buckets = new HashMap<>();
 
         //first add the local bucket if asked
@@ -244,16 +276,15 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
             }
         }
 
             }
         }
 
-        return buckets;
+        getSender().tell(buckets, getSelf());
     }
 
     }
 
-    /**
-     * Returns versions for all buckets known.
-     */
-    void receiveGetBucketVersions() {
-        final ActorRef sender = getSender();
-        GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
-        sender.tell(reply, getSelf());
+    private void removeBucket(final Address addr) {
+        final Bucket<T> bucket = remoteBuckets.remove(addr);
+        if (bucket != null) {
+            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+            onBucketRemoved(addr, bucket);
+        }
     }
 
     /**
     }
 
     /**
@@ -262,7 +293,8 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
-    void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+    @VisibleForTesting
+    void updateRemoteBuckets(final Map<Address, Bucket<?>> receivedBuckets) {
         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
             //nothing to do
         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
             //nothing to do
@@ -270,7 +302,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
 
         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
         }
 
         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
-        for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+        for (Entry<Address, Bucket<?>> entry : receivedBuckets.entrySet()) {
             final Address addr = entry.getKey();
 
             if (selfAddress.equals(addr)) {
             final Address addr = entry.getKey();
 
             if (selfAddress.equals(addr)) {
@@ -278,7 +310,8 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
                 continue;
             }
 
                 continue;
             }
 
-            final Bucket<T> receivedBucket = entry.getValue();
+            @SuppressWarnings("unchecked")
+            final Bucket<T> receivedBucket = (Bucket<T>) entry.getValue();
             if (receivedBucket == null) {
                 LOG.debug("Ignoring null bucket from {}", addr);
                 continue;
             if (receivedBucket == null) {
                 LOG.debug("Ignoring null bucket from {}", addr);
                 continue;
@@ -328,14 +361,6 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
         }
     }
 
-    private void removeBucket(final Address addr) {
-        final Bucket<T> bucket = remoteBuckets.remove(addr);
-        if (bucket != null) {
-            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
-            onBucketRemoved(addr, bucket);
-        }
-    }
-
     private void actorTerminated(final Terminated message) {
         LOG.info("Actor termination {} received", message);
 
     private void actorTerminated(final Terminated message) {
         LOG.info("Actor termination {} received", message);
 
@@ -349,60 +374,13 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
         }
     }
 
-    /**
-     * Callback to subclasses invoked when a bucket is removed.
-     *
-     * @param address Remote address
-     * @param bucket Bucket removed
-     */
-    protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
-        // Default noop
-    }
-
-    /**
-     * Callback to subclasses invoked when the set of remote buckets is updated.
-     *
-     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
-     */
-    protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
-        // Default noop
-    }
-
     @VisibleForTesting
     protected boolean isPersisting() {
         return persisting;
     }
 
     @VisibleForTesting
     protected boolean isPersisting() {
         return persisting;
     }
 
-    public T getLocalData() {
-        return getLocalBucket().getData();
-    }
-
     private LocalBucket<T> getLocalBucket() {
         Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
         return localBucket;
     }
     private LocalBucket<T> getLocalBucket() {
         Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
         return localBucket;
     }
-
-    protected void updateLocalBucket(final T data) {
-        final LocalBucket<T> local = getLocalBucket();
-        final boolean bumpIncarnation = local.setData(data);
-        versions.put(selfAddress, local.getVersion());
-
-        if (bumpIncarnation) {
-            LOG.debug("Version wrapped. incrementing incarnation");
-
-            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
-            incarnation = incarnation + 1;
-
-            persisting = true;
-            saveSnapshot(incarnation);
-        }
-    }
-
-    public Map<Address, Bucket<T>> getRemoteBuckets() {
-        return remoteBuckets;
-    }
-
-    public Map<Address, Long> getVersions() {
-        return versions;
-    }
 }
 }