BUG-7594: Rework sal-remoterpc-connector messages 29/50629/34
authorRobert Varga <rovarga@cisco.com>
Wed, 18 Jan 2017 15:01:21 +0000 (16:01 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Feb 2017 12:23:23 +0000 (12:23 +0000)
This breaks compatibility by using DOMRpcIdentifier directly
in transferred messages. Since we are breaking compatibility, we can
also rework the messages and their locations, limiting their visiblity
and features (such as Serializable).

RoutingTable no longer uses RouteIdentifier, but rather relies
on DOMRpcIdentifier, which is serialized using
NormalizedNodeDataInput/Output primitives, so the serialization
format is not dependent on the package DOMRpcIdentifier comes from
and can be compatibly switched to mdsal-provided one.

Change-Id: Idf083f9d288be9c9684c7e8e8bd99fbaff0ad4ce
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Tomas Cere <tcere@cisco.com>
18 files changed:
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java with 76% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java

index 23e6e68..d477391 100644 (file)
             <artifactId>akka-persistence_${scala.version}</artifactId>
         </dependency>
         <!-- SAL Dependencies -->
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal-connector-api</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-common-util</artifactId>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java
deleted file mode 100644 (file)
index 344649a..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class RouteIdentifierImpl
-        implements RpcRouter.RouteIdentifier<QName, QName, YangInstanceIdentifier>, Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final QName context;
-    private final QName type;
-    private final YangInstanceIdentifier route;
-
-    public RouteIdentifierImpl(final QName context, final QName type, final YangInstanceIdentifier route) {
-        Preconditions.checkNotNull(type, "Rpc type should not be null");
-        this.context = context;
-        this.type = type;
-        this.route = route;
-    }
-
-    @Override
-    public QName getContext() {
-        return context;
-    }
-
-    @Override
-    public QName getType() {
-        return type;
-    }
-
-    @Override
-    public YangInstanceIdentifier getRoute() {
-        return route;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-
-        final RouteIdentifierImpl that = (RouteIdentifierImpl) obj;
-
-        if (context == null) {
-            if (that.getContext() != null) {
-                return false;
-            }
-        } else if (!context.equals(that.context)) {
-            return false;
-        }
-
-        if (route == null) {
-            if (that.getRoute() != null) {
-                return false;
-            }
-        } else if (!route.equals(that.route)) {
-            return false;
-        }
-
-        if (type == null) {
-            if (that.getType() != null) {
-                return false;
-            }
-        } else if (!type.equals(that.type)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 0;
-        result = prime * result + (context == null ? 0 : context.hashCode());
-        result = prime * result + (type == null ? 0 : type.hashCode());
-        result = prime * result + (route == null ? 0 : route.hashCode());
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "RouteIdentifierImpl{" + "context=" + context + ", type=" + type + ", route=" + route + '}';
-    }
-}
index 3ff58f0..bd0ee02 100644 (file)
@@ -9,9 +9,7 @@ package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
@@ -19,7 +17,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,15 +39,7 @@ final class RpcListener implements DOMRpcAvailabilityListener {
         Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
         LOG.debug("Adding registration for [{}]", rpcs);
 
-        final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
-
-        for (final DOMRpcIdentifier rpc : rpcs) {
-            // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
-            final RouteIdentifier<?,?,?> routeId =
-                    new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
-            routeIds.add(routeId);
-        }
-        rpcRegistry.tell(new AddOrUpdateRoutes(routeIds), ActorRef.noSender());
+        rpcRegistry.tell(new AddOrUpdateRoutes(rpcs), ActorRef.noSender());
     }
 
     @Override
@@ -58,12 +47,7 @@ final class RpcListener implements DOMRpcAvailabilityListener {
         Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
 
         LOG.debug("Removing registration for [{}]", rpcs);
-
-        final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
-        for (final DOMRpcIdentifier rpc : rpcs) {
-            routeIds.add(new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()));
-        }
-        rpcRegistry.tell(new RemoveRoutes(routeIds), ActorRef.noSender());
+        rpcRegistry.tell(new RemoveRoutes(rpcs), ActorRef.noSender());
     }
 
     @Override
index 0e68eb8..2f95131 100644 (file)
@@ -8,38 +8,94 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
-public class RoutingTable implements BucketData<RoutingTable>, Serializable {
+public final class RoutingTable implements BucketData<RoutingTable>, Serializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+        private Collection<DOMRpcIdentifier> rpcs;
+        private ActorRef rpcInvoker;
+
+        // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+        // be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final RoutingTable table) {
+            rpcs = table.getRoutes();
+            rpcInvoker = table.getRpcInvoker();
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeObject(Serialization.serializedActorPath(rpcInvoker));
+
+            final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
+            nnout.writeInt(rpcs.size());
+            for (DOMRpcIdentifier id : rpcs) {
+                nnout.writeSchemaPath(id.getType());
+                nnout.writeYangInstanceIdentifier(id.getContextReference());
+            }
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+
+            final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
+            final int size = nnin.readInt();
+            rpcs = new ArrayList<>(size);
+            for (int i = 0; i < size; ++i) {
+                rpcs.add(DOMRpcIdentifier.create(nnin.readSchemaPath(), nnin.readYangInstanceIdentifier()));
+            }
+        }
+
+        private Object readResolve() {
+            return new RoutingTable(rpcInvoker, rpcs);
+        }
+    }
+
     private static final long serialVersionUID = 1L;
 
-    private final Set<RouteIdentifier<?, ?, ?>> rpcs;
+    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+    private final Set<DOMRpcIdentifier> rpcs;
     private final ActorRef rpcInvoker;
 
-    private RoutingTable(final ActorRef rpcInvoker, final Set<RouteIdentifier<?, ?, ?>> table) {
+    RoutingTable(final ActorRef rpcInvoker, final Collection<DOMRpcIdentifier> table) {
         this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker);
         this.rpcs = ImmutableSet.copyOf(table);
     }
 
-    RoutingTable(final ActorRef rpcInvoker) {
-        this(rpcInvoker, ImmutableSet.of());
-    }
-
     @Override
     public Optional<ActorRef> getWatchActor() {
         return Optional.of(rpcInvoker);
     }
 
-    public Set<RouteIdentifier<?, ?, ?>> getRoutes() {
+    public Set<DOMRpcIdentifier> getRoutes() {
         return rpcs;
     }
 
@@ -47,20 +103,24 @@ public class RoutingTable implements BucketData<RoutingTable>, Serializable {
         return rpcInvoker;
     }
 
-    RoutingTable addRpcs(final Collection<RouteIdentifier<?, ?, ?>> toAdd) {
-        final Set<RouteIdentifier<?, ?, ?>> newRpcs = new HashSet<>(rpcs);
+    RoutingTable addRpcs(final Collection<DOMRpcIdentifier> toAdd) {
+        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
         newRpcs.addAll(toAdd);
         return new RoutingTable(rpcInvoker, newRpcs);
     }
 
-    RoutingTable removeRpcs(final Collection<RouteIdentifier<?, ?, ?>> toRemove) {
-        final Set<RouteIdentifier<?, ?, ?>> newRpcs = new HashSet<>(rpcs);
+    RoutingTable removeRpcs(final Collection<DOMRpcIdentifier> toRemove) {
+        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
         newRpcs.removeAll(toRemove);
         return new RoutingTable(rpcInvoker, newRpcs);
     }
 
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
     @VisibleForTesting
-    boolean contains(final RouteIdentifier<?, ?, ?> routeId) {
+    boolean contains(final DOMRpcIdentifier routeId) {
         return rpcs.contains(routeId);
     }
 
index 54f7613..805e47a 100644 (file)
@@ -11,9 +11,9 @@ import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -23,27 +23,24 @@ import java.util.Optional;
 import java.util.Set;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
 
 /**
  * Registry to look up cluster nodes that have registered for a given RPC.
  *
  * <p>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends BucketStore<RoutingTable> {
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
     private final ActorRef rpcRegistrar;
 
     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
-        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker));
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
@@ -98,16 +95,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
             final RoutingTable table = e.getValue().getData();
 
-            final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
-            for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
-                if (ri instanceof RouteIdentifierImpl) {
-                    final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
-                    rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
-                } else {
-                    LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
-                }
-            }
-
+            final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
                     : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
         }
@@ -140,15 +128,15 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      */
     public static class Messages {
         abstract static class AbstractRouteMessage {
-            final List<RouteIdentifier<?, ?, ?>> routeIdentifiers;
+            final List<DOMRpcIdentifier> routeIdentifiers;
 
-            AbstractRouteMessage(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
                         "Route Identifiers must be supplied");
-                this.routeIdentifiers = routeIdentifiers;
+                this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
             }
 
-            List<RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+            List<DOMRpcIdentifier> getRouteIdentifiers() {
                 return this.routeIdentifiers;
             }
 
@@ -159,13 +147,13 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
-            public AddOrUpdateRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
         public static final class RemoveRoutes extends AbstractRouteMessage {
-            public RemoveRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
index ade614b..daf9452 100644 (file)
@@ -12,23 +12,21 @@ import com.google.common.base.Verify;
 import java.io.Serializable;
 
 final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
-    private static final long serialVersionUID = 294779770032719196L;
+    private static final long serialVersionUID = 1L;
 
-    // Guaranteed to be non-null.
-    // This is kept a Long for binary compatibility of serialization format.
-    private final Long version;
+    private final long version;
 
     // Guaranteed to be non-null
     private final T data;
 
-    BucketImpl(final Long version, final T data) {
-        this.version = Preconditions.checkNotNull(version);
+    BucketImpl(final long version, final T data) {
+        this.version = version;
         this.data = Preconditions.checkNotNull(data);
     }
 
     @Override
     public long getVersion() {
-        return version.longValue();
+        return version;
     }
 
     @Override
@@ -42,7 +40,6 @@ final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializab
     }
 
     private Object readResolve() {
-        Verify.verifyNotNull(version);
         Verify.verifyNotNull(data);
         return this;
     }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java
new file mode 100644 (file)
index 0000000..7bea1a0
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+@VisibleForTesting
+public final class BucketStoreAccess {
+    private final ActorContext context;
+    private final Timeout timeout;
+
+    BucketStoreAccess(final ActorContext context, final Timeout timeout) {
+        this.context = Preconditions.checkNotNull(context);
+        this.timeout = Preconditions.checkNotNull(timeout);
+    }
+
+    <T extends BucketData<T>> void getBucketsByMembers(final Collection<Address> members,
+            final Consumer<Map<Address, Bucket<T>>> callback) {
+        Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout)
+            .onComplete(new OnComplete<Object>() {
+                @SuppressWarnings("unchecked")
+                @Override
+                public void onComplete(final Throwable failure, final Object success) {
+                    if (failure == null) {
+                        callback.accept((Map<Address, Bucket<T>>) success);
+                    }
+                }
+            }, context.dispatcher());
+    }
+
+    void getBucketVersions(final Consumer<Map<Address, Long>> callback) {
+        Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public void onComplete(final Throwable failure, final Object success) {
+                if (failure == null) {
+                    callback.accept((Map<Address, Long>) success);
+                }
+            }
+        }, context.dispatcher());
+    }
+
+    @SuppressWarnings("unchecked")
+    void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+        context.parent().tell(updateRemoteBucketsMessage((Map<Address, Bucket<?>>) buckets), ActorRef.noSender());
+    }
+
+    void removeRemoteBucket(final Address addr) {
+        context.parent().tell(removeBucketMessage(addr), ActorRef.noSender());
+    }
+
+    @VisibleForTesting
+    public enum Singletons {
+        // Sent from Gossiper to BucketStore, response is an immutable Map<Address, Bucket<?>>
+        GET_ALL_BUCKETS,
+        // Sent from Gossiper to BucketStore, response is an immutable Map<Address, Long>
+        GET_BUCKET_VERSIONS,
+    }
+}
@@ -8,7 +8,8 @@
 
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
@@ -27,21 +28,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.SetMultimap;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -51,9 +47,15 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
  * <p>
  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
- *
  */
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
+public abstract class BucketStoreActor<T extends BucketData<T>> extends
+        AbstractUntypedPersistentActorWithMetering {
+    // Internal marker interface for messages which are just bridges to execute a method
+    @FunctionalInterface
+    private interface ExecuteInActor extends Consumer<BucketStoreActor<?>> {
+
+    }
+
     /**
      * Buckets owned by other known nodes in the cluster.
      */
@@ -86,14 +88,38 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
     private Integer incarnation;
     private boolean persisting;
 
-    public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+    protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
         this.initialData = Preconditions.checkNotNull(initialData);
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
     }
 
+    static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
+        return actor -> actor.getBucketsByMembers(members);
+    }
+
+    static ExecuteInActor removeBucketMessage(final Address addr) {
+        return actor -> actor.removeBucket(addr);
+    }
+
+    static ExecuteInActor updateRemoteBucketsMessage(final Map<Address, Bucket<?>> buckets) {
+        return actor -> actor.updateRemoteBuckets(buckets);
+    }
+
+    public final T getLocalData() {
+        return getLocalBucket().getData();
+    }
+
+    public final Map<Address, Bucket<T>> getRemoteBuckets() {
+        return remoteBuckets;
+    }
+
+    public final Map<Address, Long> getVersions() {
+        return versions;
+    }
+
     @Override
-    public String persistenceId() {
+    public final String persistenceId() {
         return persistenceId;
     }
 
@@ -107,12 +133,11 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void handleCommand(final Object message) throws Exception {
-        if (message instanceof GetAllBuckets) {
+        if (GET_ALL_BUCKETS == message) {
             // GetAllBuckets is used only in testing
-            receiveGetAllBuckets();
+            getSender().tell(getAllBuckets(), self());
             return;
         }
 
@@ -121,14 +146,11 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
             return;
         }
 
-        if (message instanceof GetBucketsByMembers) {
-            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
-        } else if (message instanceof GetBucketVersions) {
-            receiveGetBucketVersions();
-        } else if (message instanceof UpdateRemoteBuckets) {
-            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
-        } else if (message instanceof RemoveRemoteBucket) {
-            removeBucket(((RemoveRemoteBucket) message).getAddress());
+        if (message instanceof ExecuteInActor) {
+            ((ExecuteInActor) message).accept(this);
+        } else if (GET_BUCKET_VERSIONS == message) {
+            // FIXME: do we need to send ourselves?
+            getSender().tell(ImmutableMap.copyOf(versions), getSelf());
         } else if (message instanceof Terminated) {
             actorTerminated((Terminated) message);
         } else if (message instanceof DeleteSnapshotsSuccess) {
@@ -161,7 +183,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
     }
 
     @Override
-    protected void handleRecover(final Object message) throws Exception {
+    protected final void handleRecover(final Object message) throws Exception {
         if (message instanceof RecoveryCompleted) {
             if (incarnation != null) {
                 incarnation = incarnation + 1;
@@ -182,25 +204,47 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
-    protected RemoteRpcProviderConfig getConfig() {
+    protected final RemoteRpcProviderConfig getConfig() {
         return config;
     }
 
+    protected final void updateLocalBucket(final T data) {
+        final LocalBucket<T> local = getLocalBucket();
+        final boolean bumpIncarnation = local.setData(data);
+        versions.put(selfAddress, local.getVersion());
+
+        if (bumpIncarnation) {
+            LOG.debug("Version wrapped. incrementing incarnation");
+
+            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+            incarnation = incarnation + 1;
+
+            persisting = true;
+            saveSnapshot(incarnation);
+        }
+    }
+
     /**
-     * Returns all the buckets the this node knows about, self owned + remote.
+     * Callback to subclasses invoked when a bucket is removed.
+     *
+     * @param address Remote address
+     * @param bucket Bucket removed
      */
-    @VisibleForTesting
-    protected void receiveGetAllBuckets() {
-        final ActorRef sender = getSender();
-        sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
-    }
+    protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+
+    /**
+     * Callback to subclasses invoked when the set of remote buckets is updated.
+     *
+     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+     */
+    protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
 
     /**
      * Helper to collect all known buckets.
      *
      * @return self owned + remote buckets
      */
-    Map<Address, Bucket<T>> getAllBuckets() {
+    private Map<Address, Bucket<T>> getAllBuckets() {
         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
@@ -212,24 +256,12 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         return all;
     }
 
-    /**
-     * Returns buckets for requested members that this node knows about.
-     *
-     * @param members requested members
-     */
-    void receiveGetBucketsByMembers(final Set<Address> members) {
-        final ActorRef sender = getSender();
-        Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
-        sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
-    }
-
     /**
      * Helper to collect buckets for requested members.
      *
      * @param members requested members
-     * @return buckets for requested members
      */
-    Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
+    private void getBucketsByMembers(final Collection<Address> members) {
         Map<Address, Bucket<T>> buckets = new HashMap<>();
 
         //first add the local bucket if asked
@@ -244,16 +276,15 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
             }
         }
 
-        return buckets;
+        getSender().tell(buckets, getSelf());
     }
 
-    /**
-     * Returns versions for all buckets known.
-     */
-    void receiveGetBucketVersions() {
-        final ActorRef sender = getSender();
-        GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
-        sender.tell(reply, getSelf());
+    private void removeBucket(final Address addr) {
+        final Bucket<T> bucket = remoteBuckets.remove(addr);
+        if (bucket != null) {
+            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+            onBucketRemoved(addr, bucket);
+        }
     }
 
     /**
@@ -262,7 +293,8 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
-    void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+    @VisibleForTesting
+    void updateRemoteBuckets(final Map<Address, Bucket<?>> receivedBuckets) {
         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
             //nothing to do
@@ -270,7 +302,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
 
         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
-        for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+        for (Entry<Address, Bucket<?>> entry : receivedBuckets.entrySet()) {
             final Address addr = entry.getKey();
 
             if (selfAddress.equals(addr)) {
@@ -278,7 +310,8 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
                 continue;
             }
 
-            final Bucket<T> receivedBucket = entry.getValue();
+            @SuppressWarnings("unchecked")
+            final Bucket<T> receivedBucket = (Bucket<T>) entry.getValue();
             if (receivedBucket == null) {
                 LOG.debug("Ignoring null bucket from {}", addr);
                 continue;
@@ -328,14 +361,6 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
-    private void removeBucket(final Address addr) {
-        final Bucket<T> bucket = remoteBuckets.remove(addr);
-        if (bucket != null) {
-            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
-            onBucketRemoved(addr, bucket);
-        }
-    }
-
     private void actorTerminated(final Terminated message) {
         LOG.info("Actor termination {} received", message);
 
@@ -349,60 +374,13 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersist
         }
     }
 
-    /**
-     * Callback to subclasses invoked when a bucket is removed.
-     *
-     * @param address Remote address
-     * @param bucket Bucket removed
-     */
-    protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
-        // Default noop
-    }
-
-    /**
-     * Callback to subclasses invoked when the set of remote buckets is updated.
-     *
-     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
-     */
-    protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
-        // Default noop
-    }
-
     @VisibleForTesting
     protected boolean isPersisting() {
         return persisting;
     }
 
-    public T getLocalData() {
-        return getLocalBucket().getData();
-    }
-
     private LocalBucket<T> getLocalBucket() {
         Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
         return localBucket;
     }
-
-    protected void updateLocalBucket(final T data) {
-        final LocalBucket<T> local = getLocalBucket();
-        final boolean bumpIncarnation = local.setData(data);
-        versions.put(selfAddress, local.getVersion());
-
-        if (bumpIncarnation) {
-            LOG.debug("Version wrapped. incrementing incarnation");
-
-            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
-            incarnation = incarnation + 1;
-
-            persisting = true;
-            saveSnapshot(incarnation);
-        }
-    }
-
-    public Map<Address, Bucket<T>> getRemoteBuckets() {
-        return remoteBuckets;
-    }
-
-    public Map<Address, Long> getVersions() {
-        return versions;
-    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java
new file mode 100644 (file)
index 0000000..a6a0c2b
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.Address;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.util.Map;
+
+final class GossipEnvelope implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Map<Address, Bucket<?>> buckets;
+    private final Address from;
+    private final Address to;
+
+    GossipEnvelope(final Address from, final Address to, final Map<Address, ? extends Bucket<?>> buckets) {
+        this.to = Preconditions.checkNotNull(to);
+        this.buckets = ImmutableMap.copyOf(buckets);
+        this.from = from;
+    }
+
+    Map<Address, Bucket<?>> buckets() {
+        return buckets;
+    }
+
+    Address from() {
+        return from;
+    }
+
+    Address to() {
+        return to;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java
new file mode 100644 (file)
index 0000000..db62185
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.Address;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.util.Map;
+
+final class GossipStatus implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Map<Address, Long> versions;
+    private final Address from;
+
+    GossipStatus(final Address from, final Map<Address, Long> versions) {
+        this.versions = ImmutableMap.copyOf(versions);
+        this.from = from;
+    }
+
+    Address from() {
+        return from;
+    }
+
+    Map<Address, Long> versions() {
+        return versions;
+    }
+}
\ No newline at end of file
index 2c47c4e..015c0a1 100644 (file)
@@ -17,8 +17,6 @@ import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
@@ -27,21 +25,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -63,6 +52,13 @@ import scala.concurrent.duration.FiniteDuration;
  * for update.
  */
 public class Gossiper extends AbstractUntypedActorWithMetering {
+    private static final Object GOSSIP_TICK = new Object() {
+        @Override
+        public String toString() {
+            return "gossip tick";
+        }
+    };
+
     private final boolean autoStartGossipTicks;
     private final RemoteRpcProviderConfig config;
 
@@ -85,6 +81,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     private Cancellable gossipTask;
 
+    private BucketStoreAccess bucketStore;
+
     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
         this.config = Preconditions.checkNotNull(config);
         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
@@ -107,6 +105,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
+        bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+
         if (provider instanceof ClusterActorRefProvider ) {
             cluster = Cluster.get(getContext().system());
             cluster.subscribe(getSelf(),
@@ -121,7 +121,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
                     config.getGossipTickInterval(),                 //interval
                     getSelf(),                                      //target
-                    new Messages.GossiperMessages.GossipTick(),     //message
+                    GOSSIP_TICK,                                    //message
                     getContext().dispatcher(),                      //execution context
                     getSelf()                                       //sender
             );
@@ -138,12 +138,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     protected void handleReceive(final Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
-        if (message instanceof GossipTick) {
+        if (GOSSIP_TICK.equals(message)) {
             receiveGossipTick();
         } else if (message instanceof GossipStatus) {
             // Message from remote gossiper with its bucket versions
@@ -197,7 +196,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     private void removePeer(final Address address) {
         clusterMembers.remove(address);
         peers.remove(address);
-        getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
+        bucketStore.removeRemoteBucket(address);
     }
 
     /**
@@ -258,15 +257,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     @VisibleForTesting
     void receiveGossipStatus(final GossipStatus status) {
         // Don't accept messages from non-members
-        if (!peers.containsKey(status.from())) {
-            return;
+        if (peers.containsKey(status.from())) {
+            // FIXME: sender should be part of GossipStatus
+            final ActorRef sender = getSender();
+            bucketStore.getBucketVersions(versions ->  processRemoteStatus(sender, status, versions));
+        }
+    }
+
+    private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+            final Map<Address, Long> localVersions) {
+        final Map<Address, Long> remoteVersions = status.versions();
+
+        //diff between remote list and local
+        final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
+        localIsOlder.removeAll(localVersions.keySet());
+
+        //diff between local list and remote
+        final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
+        localIsNewer.removeAll(remoteVersions.keySet());
+
+
+        for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
+            Address address = entry.getKey();
+            Long remoteVersion = entry.getValue();
+            Long localVersion = localVersions.get(address);
+            if (localVersion == null || remoteVersion == null) {
+                //this condition is taken care of by above diffs
+                continue;
+            }
+
+            if (localVersion < remoteVersion) {
+                localIsOlder.add(address);
+            } else if (localVersion > remoteVersion) {
+                localIsNewer.add(address);
+            }
         }
 
-        final ActorRef sender = getSender();
-        Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+        if (!localIsOlder.isEmpty()) {
+            remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
+        }
 
-        futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
+        if (!localIsNewer.isEmpty()) {
+            //send newer buckets to remote
+            bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
+                LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
+                remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
+            });
+        }
     }
 
     /**
@@ -275,14 +312,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param envelope contains buckets from a remote gossiper
      */
     @VisibleForTesting
-    <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
+    void receiveGossip(final GossipEnvelope envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             return;
         }
 
-        updateRemoteBuckets(envelope.getBuckets());
+        updateRemoteBuckets(envelope.buckets());
     }
 
     /**
@@ -291,21 +328,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param buckets map of Buckets to update
      */
     @VisibleForTesting
-    <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
-        getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
-    }
-
-    /**
-     * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
-     *
-     * @param remote     remote node to send Buckets to
-     * @param addresses  node addresses whose buckets needs to be sent
-     */
-    void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
-
-        Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
-        futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
+    void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+        bucketStore.updateRemoteBuckets(buckets);
     }
 
     /**
@@ -315,133 +339,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
-
-        //Get local status from bucket store and send to remote
-        Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
-
-        LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
-
-        futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
-    }
-
-    ///
-    /// Private factories to create mappers
-    ///
-
-    private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(final Object replyMessage) {
-                if (replyMessage instanceof GetBucketVersionsReply) {
-                    GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
-                    Map<Address, Long> localVersions = reply.getVersions();
-
-                    remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Process bucket versions received from
-     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
-     * Then this method compares remote bucket versions with local bucket versions.
-     * <ul>
-     *     <li>The buckets that are newer locally, send
-     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
-     *     to remote
-     *     <li>The buckets that are older locally, send
-     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
-     *     to remote so that remote sends GossipEnvelop.
-     * </ul>
-     *
-     * @param sender the remote member
-     * @param status bucket versions from a remote member
-     * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
-     *
-     */
-    private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
-
-        final Map<Address, Long> remoteVersions = status.getVersions();
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(final Object replyMessage) {
-                if (replyMessage instanceof GetBucketVersionsReply) {
-                    GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
-                    Map<Address, Long> localVersions = reply.getVersions();
-
-                    //diff between remote list and local
-                    Set<Address> localIsOlder = new HashSet<>();
-                    localIsOlder.addAll(remoteVersions.keySet());
-                    localIsOlder.removeAll(localVersions.keySet());
-
-                    //diff between local list and remote
-                    Set<Address> localIsNewer = new HashSet<>();
-                    localIsNewer.addAll(localVersions.keySet());
-                    localIsNewer.removeAll(remoteVersions.keySet());
-
-
-                    for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
-                        Address address = entry.getKey();
-                        Long remoteVersion = entry.getValue();
-                        Long localVersion = localVersions.get(address);
-                        if (localVersion == null || remoteVersion == null) {
-                            //this condition is taken care of by above diffs
-                            continue;
-                        }
-
-                        if (localVersion < remoteVersion) {
-                            localIsOlder.add(address);
-                        } else if (localVersion > remoteVersion) {
-                            localIsNewer.add(address);
-                        }
-                    }
-
-                    if (!localIsOlder.isEmpty()) {
-                        sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
-                    }
-
-                    if (!localIsNewer.isEmpty()) {
-                        //send newer buckets to remote
-                        sendGossipTo(sender, localIsNewer);
-                    }
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
-     * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
-     * These buckets are sent to a remote member encapsulated in
-     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
-     *
-     * @param sender the remote member that sent
-     *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
-     *           in reply to which bucket is being sent back
-     * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
-     *
-     */
-    private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
-        return new Mapper<Object, Void>() {
-            @SuppressWarnings({ "rawtypes", "unchecked" })
-            @Override
-            public Void apply(final Object msg) {
-                if (msg instanceof GetBucketsByMembersReply) {
-                    Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
-                    GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
-                    sender.tell(envelope, getSelf());
-                }
-                return null;
-            }
-        };
+        bucketStore.getBucketVersions(versions -> {
+            LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
+            /*
+             * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
+             *      but can we identify which bucket is the local one?
+             */
+            remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
+        });
     }
 
     ///
index b2c5e0c..e5a2a54 100644 (file)
@@ -23,7 +23,7 @@ final class LocalBucket<T extends BucketData<T>> {
      *
      * We are keeping a boxed version here, as we stick it into a map anyway.
      */
-    private Long version;
+    private long version;
     private T data;
 
     // We bump versions only if we took a snapshot since last data update
@@ -39,7 +39,7 @@ final class LocalBucket<T extends BucketData<T>> {
         return data;
     }
 
-    Long getVersion() {
+    long getVersion() {
         return version;
     }
 
@@ -50,15 +50,11 @@ final class LocalBucket<T extends BucketData<T>> {
 
     boolean setData(final T data) {
         this.data = Preconditions.checkNotNull(data);
-        if (bumpVersion) {
-            final long next = version.longValue() + 1;
-            if ((next & 0xffff_ffffL) == 0) {
-                return true;
-            }
-
-            version = next;
-            bumpVersion = false;
+        if (!bumpVersion) {
+            return false;
         }
-        return false;
+
+        bumpVersion = false;
+        return (++version & 0xffff_ffffL) == 0;
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
deleted file mode 100644 (file)
index 361f5b7..0000000
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.registry.gossip;
-
-import akka.actor.Address;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
-
-/**
- * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
- */
-public class Messages {
-
-    public static class BucketStoreMessages {
-
-        public static final class GetAllBuckets implements Serializable {
-            private static final long serialVersionUID = 1L;
-        }
-
-        public static final class GetBucketsByMembers implements Serializable {
-            private static final long serialVersionUID = 1L;
-            private final Set<Address> members;
-
-            public GetBucketsByMembers(final Set<Address> members) {
-                Preconditions.checkArgument(members != null, "members can not be null");
-                this.members = ImmutableSet.copyOf(members);
-            }
-
-            public Set<Address> getMembers() {
-                return members;
-            }
-        }
-
-        public static class ContainsBuckets<T extends BucketData<T>> implements Serializable {
-            private static final long serialVersionUID = -4940160367495308286L;
-
-            private final Map<Address, Bucket<T>> buckets;
-
-            protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
-                Preconditions.checkArgument(buckets != null, "buckets can not be null");
-                this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets));
-            }
-
-            public final Map<Address, Bucket<T>> getBuckets() {
-                return buckets;
-            }
-        }
-
-        public static final class GetAllBucketsReply<T extends BucketData<T>> extends ContainsBuckets<T> {
-            private static final long serialVersionUID = 1L;
-
-            public GetAllBucketsReply(final Map<Address, Bucket<T>> buckets) {
-                super(buckets);
-            }
-        }
-
-        public static final class GetBucketsByMembersReply<T extends BucketData<T>> extends ContainsBuckets<T>  {
-            private static final long serialVersionUID = 1L;
-
-            public GetBucketsByMembersReply(final Map<Address, Bucket<T>> buckets) {
-                super(buckets);
-            }
-        }
-
-        public static final class GetBucketVersions implements Serializable {
-            private static final long serialVersionUID = 1L;
-        }
-
-        public static class ContainsBucketVersions implements Serializable {
-            private static final long serialVersionUID = -8172148925383801613L;
-
-            Map<Address, Long> versions;
-
-            public ContainsBucketVersions(final Map<Address, Long> versions) {
-                Preconditions.checkArgument(versions != null, "versions can not be null or empty");
-
-                this.versions = ImmutableMap.copyOf(versions);
-            }
-
-            public Map<Address, Long> getVersions() {
-                return versions;
-            }
-        }
-
-        public static final class GetBucketVersionsReply extends ContainsBucketVersions {
-            private static final long serialVersionUID = 1L;
-
-            public GetBucketVersionsReply(final Map<Address, Long> versions) {
-                super(versions);
-            }
-        }
-
-        public static final class UpdateRemoteBuckets<T extends BucketData<T>> extends ContainsBuckets<T> {
-            private static final long serialVersionUID = 1L;
-
-            public UpdateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
-                super(buckets);
-            }
-        }
-
-        /**
-         * Message sent from the gossiper to its parent, therefore not Serializable, requesting removal
-         * of a bucket corresponding to an address.
-         */
-        public static final class RemoveRemoteBucket {
-            private final Address address;
-
-            public RemoveRemoteBucket(final Address address) {
-                this.address = Preconditions.checkNotNull(address);
-            }
-
-            public Address getAddress() {
-                return address;
-            }
-        }
-    }
-
-    public static class GossiperMessages {
-        public static class Tick implements Serializable {
-            private static final long serialVersionUID = -4770935099506366773L;
-        }
-
-        public static final class GossipTick extends Tick {
-            private static final long serialVersionUID = 5803354404380026143L;
-        }
-
-        public static final class GossipStatus extends ContainsBucketVersions {
-            private static final long serialVersionUID = -593037395143883265L;
-
-            private final Address from;
-
-            public GossipStatus(final Address from, final Map<Address, Long> versions) {
-                super(versions);
-                this.from = from;
-            }
-
-            public Address from() {
-                return from;
-            }
-        }
-
-        public static final class GossipEnvelope<T extends BucketData<T>> extends ContainsBuckets<T> {
-            private static final long serialVersionUID = 8346634072582438818L;
-
-            private final Address from;
-            private final Address to;
-
-            public GossipEnvelope(final Address from, final Address to, final Map<Address, Bucket<T>> buckets) {
-                super(buckets);
-                Preconditions.checkArgument(to != null, "Recipient of message must not be null");
-                this.to = to;
-                this.from = from;
-            }
-
-            public Address from() {
-                return from;
-            }
-
-            public Address to() {
-                return to;
-            }
-        }
-    }
-}
index 339d4b1..eabb170 100644 (file)
@@ -15,10 +15,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,8 +27,6 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final String NULL_CONSTANT = "null";
-
     private static final String LOCAL_CONSTANT = "local";
 
     private static final String ROUTE_CONSTANT = "route:";
@@ -47,9 +45,9 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     public Set<String> getGlobalRpc() {
         RoutingTable table = rpcRegistry.getLocalData();
         Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
-        for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
-            if (route.getRoute() == null) {
-                globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT);
+        for (DOMRpcIdentifier route : table.getRoutes()) {
+            if (route.getContextReference().isEmpty()) {
+                globalRpc.add(route.getType().toString());
             }
         }
 
@@ -61,11 +59,10 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     public Set<String> getLocalRegisteredRoutedRpc() {
         RoutingTable table = rpcRegistry.getLocalData();
         Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
-        for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
-            if (route.getRoute() != null) {
+        for (DOMRpcIdentifier route : table.getRoutes()) {
+            if (!route.getContextReference().isEmpty()) {
                 StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
-                builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null
-                    ? route.getType().toString() : NULL_CONSTANT);
+                builder.append(route.getContextReference().toString()).append(NAME_CONSTANT).append(route.getType());
                 routedRpc.add(builder.toString());
             }
         }
@@ -111,15 +108,14 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
      */
     private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
                                                       final String address) {
-        Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+        Set<DOMRpcIdentifier> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
-        for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
-            if (route.getRoute() != null) {
-                String routeString = route.getRoute().toString();
+        for (DOMRpcIdentifier route : routes) {
+            if (!route.getContextReference().isEmpty()) {
+                String routeString = route.getContextReference().toString();
                 if (routeString.contains(routeName)) {
                     StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
-                    builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null
-                        ? route.getType().toString() : NULL_CONSTANT);
+                    builder.append(routeString).append(NAME_CONSTANT).append(route.getType());
                     rpcMap.put(builder.toString(), address);
                 }
             }
@@ -132,15 +128,14 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
      */
     private static Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
                                                        final String address) {
-        Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+        Set<DOMRpcIdentifier> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
-        for (RpcRouter.RouteIdentifier<?, ?, ?> route : routes) {
-            if (route.getType() != null) {
+        for (DOMRpcIdentifier route : routes) {
+            if (!route.getContextReference().isEmpty()) {
                 String type = route.getType().toString();
                 if (type.contains(name)) {
                     StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
-                    builder.append(route.getRoute() != null ? route.getRoute().toString() : NULL_CONSTANT)
-                        .append(NAME_CONSTANT).append(type);
+                    builder.append(route.getContextReference()).append(NAME_CONSTANT).append(type);
                     rpcMap.put(builder.toString(), address);
                 }
             }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java
deleted file mode 100644 (file)
index 462a514..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.utils;
-
-import akka.actor.ActorRef;
-import com.google.common.base.Predicate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConditionalProbe {
-    private final ActorRef actorRef;
-    private final Predicate<Object> predicate;
-    Logger log = LoggerFactory.getLogger(ConditionalProbe.class);
-
-    public ConditionalProbe(ActorRef actorRef, Predicate<Object> predicate) {
-        this.actorRef = actorRef;
-        this.predicate = predicate;
-    }
-
-    public void tell(Object message, ActorRef sender) {
-        if (predicate.apply(message)) {
-            log.info("sending message to probe {}", message);
-            actorRef.tell(message, sender);
-        }
-    }
-}
index 0c3a57e..8406273 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import static org.junit.Assert.fail;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -27,6 +29,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -40,20 +43,15 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -183,7 +181,7 @@ public class RpcRegistryTest {
 
         // Add rpc on node 1
 
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
+        List<DOMRpcIdentifier> addedRouteIds = createRouteIds();
 
         registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
 
@@ -217,7 +215,7 @@ public class RpcRegistryTest {
 
         LOG.info("testRpcAddRemoveInCluster starting");
 
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
+        List<DOMRpcIdentifier> addedRouteIds = createRouteIds();
 
         Address node1Address = node1.provider().getDefaultAddress();
 
@@ -249,7 +247,7 @@ public class RpcRegistryTest {
             buckets = retrieveBuckets(registry1, testKit, address);
 
             try {
-                verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
+                verifyBucket(buckets.get(address), Collections.emptyList());
                 break;
             } catch (AssertionError e) {
                 if (++numTries >= 50) {
@@ -269,14 +267,14 @@ public class RpcRegistryTest {
         final JavaTestKit testKit = new JavaTestKit(node3);
 
         // Add rpc on node 1
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
+        List<DOMRpcIdentifier> addedRouteIds1 = createRouteIds();
         registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());
 
         final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
             UpdateRemoteEndpoints.class);
 
         // Add rpc on node 2
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
+        List<DOMRpcIdentifier> addedRouteIds2 = createRouteIds();
         registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());
 
         final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
@@ -320,17 +318,16 @@ public class RpcRegistryTest {
     }
 
     private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) {
-        bucketStore.tell(new GetBucketVersions(), testKit.getRef());
-        GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
-                GetBucketVersionsReply.class);
-        return reply.getVersions();
+        bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
+        @SuppressWarnings("unchecked")
+        final Map<Address, Long> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class);
+        return reply;
     }
 
-    private static void verifyBucket(final Bucket<RoutingTable> bucket,
-            final List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+    private static void verifyBucket(final Bucket<RoutingTable> bucket, final List<DOMRpcIdentifier> expRouteIds) {
         RoutingTable table = bucket.getData();
         Assert.assertNotNull("Bucket RoutingTable is null", table);
-        for (RouteIdentifier<?, ?, ?> r : expRouteIds) {
+        for (DOMRpcIdentifier r : expRouteIds) {
             if (!table.contains(r)) {
                 Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
             }
@@ -343,12 +340,11 @@ public class RpcRegistryTest {
             final JavaTestKit testKit, final Address... addresses) {
         int numTries = 0;
         while (true) {
-            bucketStore.tell(new GetAllBuckets(), testKit.getRef());
+            bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
             @SuppressWarnings("unchecked")
-            GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
-                    GetAllBucketsReply.class);
+            Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                    Map.class);
 
-            Map<Address, Bucket<RoutingTable>> buckets = reply.getBuckets();
             boolean foundAll = true;
             for (Address addr : addresses) {
                 Bucket<RoutingTable> bucket = buckets.get(addr);
@@ -376,30 +372,29 @@ public class RpcRegistryTest {
         final JavaTestKit testKit = new JavaTestKit(node1);
 
         final int nRoutes = 500;
-        final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+        final Collection<DOMRpcIdentifier> added = new ArrayList<>(nRoutes);
         for (int i = 0; i < nRoutes; i++) {
-            final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
-                    new QName(new URI("/mockrpc"), "type" + i), null);
-            added[i] = routeId;
+            final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true,
+                    new QName(new URI("/mockrpc"), "type" + i)));
+            added.add(routeId);
 
             //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-            registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+            registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)),
                     ActorRef.noSender());
         }
 
-        GetAllBuckets getAllBuckets = new GetAllBuckets();
         FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
         int numTries = 0;
         while (true) {
-            registry1.tell(getAllBuckets, testKit.getRef());
+            registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
             @SuppressWarnings("unchecked")
-            GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+            Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(duration, Map.class);
 
-            Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+            Bucket<RoutingTable> localBucket = buckets.values().iterator().next();
             RoutingTable table = localBucket.getData();
             if (table != null && table.size() == nRoutes) {
-                for (RouteIdentifier<?, ?, ?> r : added) {
-                    Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+                for (DOMRpcIdentifier r : added) {
+                    Assert.assertTrue("RoutingTable contains " + r, table.contains(r));
                 }
 
                 break;
@@ -413,10 +408,10 @@ public class RpcRegistryTest {
         }
     }
 
-    private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
+    private List<DOMRpcIdentifier> createRouteIds() throws URISyntaxException {
         QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>(1);
-        routeIds.add(new RouteIdentifierImpl(null, type, null));
+        List<DOMRpcIdentifier> routeIds = new ArrayList<>(1);
+        routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type)));
         return routeIds;
     }
 }
index 59dd29b..4898b27 100644 (file)
@@ -7,39 +7,23 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
-import static akka.actor.ActorRef.noSender;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.Status.Success;
-import akka.pattern.Patterns;
-import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
 
 public class BucketStoreTest {
 
@@ -58,8 +42,6 @@ public class BucketStoreTest {
 
     private static ActorSystem system;
 
-    private JavaTestKit kit;
-
     @BeforeClass
     public static void setup() {
         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
@@ -71,74 +53,55 @@ public class BucketStoreTest {
         JavaTestKit.shutdownActorSystem(system);
     }
 
-    @Before
-    public void before() {
-        kit = new JavaTestKit(system);
-    }
-
-    @After
-    public void after() {
-        kit.shutdown(system);
-    }
-
     /**
      * Given remote buckets, should merge with local copy of remote buckets.
      */
     @Test
-    public void testReceiveUpdateRemoteBuckets() throws Exception {
+    public void testReceiveUpdateRemoteBuckets() {
+
+        final BucketStoreActor<T> store = createStore();
 
-        final ActorRef store = createStore();
         Address localAddress = system.provider().getDefaultAddress();
         Bucket<T> localBucket = new BucketImpl<>(0L, new T());
 
-        Address a1 = new Address("tcp", "system1");
-        Address a2 = new Address("tcp", "system2");
-        Address a3 = new Address("tcp", "system3");
+        final Address a1 = new Address("tcp", "system1");
+        final Address a2 = new Address("tcp", "system2");
+        final Address a3 = new Address("tcp", "system3");
 
-        Bucket<T> b1 = new BucketImpl<>(0L, new T());
-        Bucket<T> b2 = new BucketImpl<>(0L, new T());
-        Bucket<T> b3 = new BucketImpl<>(0L, new T());
-
-        Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
-        remoteBuckets.put(a1, b1);
-        remoteBuckets.put(a2, b2);
-        remoteBuckets.put(a3, b3);
-        remoteBuckets.put(localAddress, localBucket);
-
-        Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
-                Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
+        final Bucket<T> b1 = new BucketImpl<>(0L, new T());
+        final Bucket<T> b2 = new BucketImpl<>(0L, new T());
+        final Bucket<T> b3 = new BucketImpl<>(0L, new T());
 
         //Given remote buckets
-        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+        store.updateRemoteBuckets(ImmutableMap.of(a1, b1, a2, b2, localAddress, localBucket));
 
-        //Should contain local bucket
-        //Should contain 4 entries i.e a1, a2, a3, local
-        Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
-        Assert.assertTrue(remoteBucketsInStore.size() == 4);
+        //Should NOT contain local bucket
+        //Should contain ONLY 3 entries i.e a1, a2
+        Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
+        Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
+        Assert.assertTrue(remoteBucketsInStore.size() == 2);
 
         //Add a new remote bucket
         Address a4 = new Address("tcp", "system4");
         Bucket<T> b4 = new BucketImpl<>(0L, new T());
-        remoteBuckets.clear();
-        remoteBuckets.put(a4, b4);
-        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+        store.updateRemoteBuckets(ImmutableMap.of(a4, b4));
 
         //Should contain a4
-        //Should contain 5 entries now i.e a1, a2, a3, a4, local
-        remoteBucketsInStore = getBuckets(store);
+        //Should contain 4 entries now i.e a1, a2, a4
+        remoteBucketsInStore = store.getRemoteBuckets();
         Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
-        Assert.assertTrue(remoteBucketsInStore.size() == 5);
+        Assert.assertTrue(remoteBucketsInStore.size() == 3);
 
         //Update a bucket
         Bucket<T> b3New = new BucketImpl<>(0L, new T());
-        remoteBuckets.clear();
+        Map<Address, Bucket<?>> remoteBuckets = new HashMap<>(3);
         remoteBuckets.put(a3, b3New);
         remoteBuckets.put(a1, null);
         remoteBuckets.put(a2, null);
-        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+        store.updateRemoteBuckets(remoteBuckets);
 
         //Should only update a3
-        remoteBucketsInStore = getBuckets(store);
+        remoteBucketsInStore = store.getRemoteBuckets();
         Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
 
@@ -147,11 +110,11 @@ public class BucketStoreTest {
         Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
         Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
         Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
-        Assert.assertTrue(remoteBucketsInStore.size() == 5);
+        Assert.assertTrue(remoteBucketsInStore.size() == 4);
 
         //Should update versions map
         //versions map contains versions for all remote buckets (4).
-        Map<Address, Long> versionsInStore = getVersions(store);
+        Map<Address, Long> versionsInStore = store.getVersions();
         Assert.assertEquals(4, versionsInStore.size());
         Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
         Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
@@ -161,12 +124,13 @@ public class BucketStoreTest {
         //Send older version of bucket
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3);
-        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+        store.updateRemoteBuckets(remoteBuckets);
 
         //Should NOT update a3
-        remoteBucketsInStore = getBuckets(store);
+        remoteBucketsInStore = store.getRemoteBuckets();
         b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
+
     }
 
     /**
@@ -174,63 +138,28 @@ public class BucketStoreTest {
      *
      * @return instance of BucketStore class
      */
-    private ActorRef createStore() {
-        return kit.childActorOf(Props.create(TestingBucketStore.class,
-            new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
-    }
-
-    @SuppressWarnings("unchecked")
-    private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
-        final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
-                Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
-        return result.getBuckets();
+    private static BucketStoreActor<T> createStore() {
+        final Props props = Props.create(TestingBucketStoreActor.class,
+                new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T());
+        return TestActorRef.<BucketStoreActor<T>>create(system, props, "testStore").underlyingActor();
     }
 
-    @SuppressWarnings("unchecked")
-    private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
-        return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
-            Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
-    }
+    private static final class TestingBucketStoreActor extends BucketStoreActor<T> {
 
-    private static final class TestingBucketStore extends BucketStore<T> {
-
-        private final List<ActorRef> toNotify = new ArrayList<>();
-
-        TestingBucketStore(final RemoteRpcProviderConfig config,
-                                  final String persistenceId,
-                                  final T initialData) {
+        protected TestingBucketStoreActor(final RemoteRpcProviderConfig config,
+                                          final String persistenceId,
+                                          final T initialData) {
             super(config, persistenceId, initialData);
         }
 
         @Override
-        protected void handleCommand(Object message) throws Exception {
-            if (message instanceof WaitUntilDonePersisting) {
-                handlePersistAsk();
-            } else if (message instanceof SaveSnapshotSuccess) {
-                super.handleCommand(message);
-                handleSnapshotSuccess();
-            } else {
-                super.handleCommand(message);
-            }
-        }
+        protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
 
-        private void handlePersistAsk() {
-            if (isPersisting()) {
-                toNotify.add(getSender());
-            } else {
-                getSender().tell(new Success(null), noSender());
-            }
         }
 
-        private void handleSnapshotSuccess() {
-            toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
-        }
-    }
-
-    /**
-     * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
-     */
-    private static final class WaitUntilDonePersisting {
+        @Override
+        protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
 
+        }
     }
 }
index 9fccb06..f1e7fea 100644 (file)
@@ -31,8 +31,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
 
 
 public class GossiperTest {
@@ -63,7 +61,6 @@ public class GossiperTest {
     @After
     public void resetMocks() {
         reset(mockGossiper);
-
     }
 
     @Test
@@ -85,7 +82,6 @@ public class GossiperTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore() {
-
         Address nonMember = new Address("tcp", "non-member");
         GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
 
@@ -95,14 +91,12 @@ public class GossiperTest {
         verify(mockGossiper, times(0)).getSender();
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings("unchecked")
     @Test
     public void testReceiveGossipWhenNotAddressedToSelfShouldIgnore() {
-        Address notSelf = new Address("tcp", "not-self");
-
-        GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class));
         doNothing().when(mockGossiper).updateRemoteBuckets(anyMap());
-        mockGossiper.receiveGossip(envelope);
+        Address notSelf = new Address("tcp", "not-self");
+        mockGossiper.receiveGossip(new GossipEnvelope(notSelf, notSelf, mock(Map.class)));
         verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap());
     }
 
@@ -112,7 +106,10 @@ public class GossiperTest {
      * @return instance of Gossiper class
      */
     private static Gossiper createGossiper() {
-        final Props props = Gossiper.testProps(new RemoteRpcProviderConfig(system.settings().config()));
+        final RemoteRpcProviderConfig config =
+                new RemoteRpcProviderConfig.Builder("unit-test")
+                        .withConfigReader(ConfigFactory::load).build();
+        final Props props = Gossiper.testProps(config);
         final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
 
         return testRef.underlyingActor();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.