From 5b66dd8f5e3467a07e77b20fe696b29993ce5565 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 18 Jan 2017 16:01:21 +0100 Subject: [PATCH] BUG-7594: Rework sal-remoterpc-connector messages This breaks compatibility by using DOMRpcIdentifier directly in transferred messages. Since we are breaking compatibility, we can also rework the messages and their locations, limiting their visiblity and features (such as Serializable). RoutingTable no longer uses RouteIdentifier, but rather relies on DOMRpcIdentifier, which is serialized using NormalizedNodeDataInput/Output primitives, so the serialization format is not dependent on the package DOMRpcIdentifier comes from and can be compatibly switched to mdsal-provided one. Change-Id: Idf083f9d288be9c9684c7e8e8bd99fbaff0ad4ce Signed-off-by: Robert Varga Signed-off-by: Tomas Cere --- .../md-sal/sal-remoterpc-connector/pom.xml | 4 - .../remote/rpc/RouteIdentifierImpl.java | 98 -------- .../controller/remote/rpc/RpcListener.java | 20 +- .../remote/rpc/registry/RoutingTable.java | 88 +++++-- .../remote/rpc/registry/RpcRegistry.java | 36 +-- .../rpc/registry/gossip/BucketImpl.java | 13 +- .../registry/gossip/BucketStoreAccess.java | 85 +++++++ ...BucketStore.java => BucketStoreActor.java} | 202 +++++++-------- .../rpc/registry/gossip/GossipEnvelope.java | 40 +++ .../rpc/registry/gossip/GossipStatus.java | 33 +++ .../remote/rpc/registry/gossip/Gossiper.java | 237 ++++++------------ .../rpc/registry/gossip/LocalBucket.java | 18 +- .../remote/rpc/registry/gossip/Messages.java | 178 ------------- .../mbeans/RemoteRpcRegistryMXBeanImpl.java | 37 ++- .../controller/utils/ConditionalProbe.java | 32 --- .../remote/rpc/registry/RpcRegistryTest.java | 69 +++-- .../rpc/registry/gossip/BucketStoreTest.java | 153 +++-------- .../rpc/registry/gossip/GossiperTest.java | 17 +- 18 files changed, 515 insertions(+), 845 deletions(-) delete mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java rename opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/{BucketStore.java => BucketStoreActor.java} (76%) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java delete mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java delete mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index 23e6e689b3..d477391a95 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -60,10 +60,6 @@ akka-persistence_${scala.version} - - org.opendaylight.controller - sal-connector-api - org.opendaylight.controller sal-common-util diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java deleted file mode 100644 index 344649adb0..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc; - -import com.google.common.base.Preconditions; -import java.io.Serializable; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -public class RouteIdentifierImpl - implements RpcRouter.RouteIdentifier, Serializable { - private static final long serialVersionUID = 1L; - - private final QName context; - private final QName type; - private final YangInstanceIdentifier route; - - public RouteIdentifierImpl(final QName context, final QName type, final YangInstanceIdentifier route) { - Preconditions.checkNotNull(type, "Rpc type should not be null"); - this.context = context; - this.type = type; - this.route = route; - } - - @Override - public QName getContext() { - return context; - } - - @Override - public QName getType() { - return type; - } - - @Override - public YangInstanceIdentifier getRoute() { - return route; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - final RouteIdentifierImpl that = (RouteIdentifierImpl) obj; - - if (context == null) { - if (that.getContext() != null) { - return false; - } - } else if (!context.equals(that.context)) { - return false; - } - - if (route == null) { - if (that.getRoute() != null) { - return false; - } - } else if (!route.equals(that.route)) { - return false; - } - - if (type == null) { - if (that.getType() != null) { - return false; - } - } else if (!type.equals(that.type)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 0; - result = prime * result + (context == null ? 0 : context.hashCode()); - result = prime * result + (type == null ? 0 : type.hashCode()); - result = prime * result + (route == null ? 0 : route.hashCode()); - return result; - } - - @Override - public String toString() { - return "RouteIdentifierImpl{" + "context=" + context + ", type=" + type + ", route=" + route + '}'; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java index 3ff58f02e3..bd0ee02147 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java @@ -9,9 +9,7 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; import com.google.common.base.Preconditions; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; @@ -19,7 +17,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,15 +39,7 @@ final class RpcListener implements DOMRpcAvailabilityListener { Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null."); LOG.debug("Adding registration for [{}]", rpcs); - final List> routeIds = new ArrayList<>(rpcs.size()); - - for (final DOMRpcIdentifier rpc : rpcs) { - // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly. - final RouteIdentifier routeId = - new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()); - routeIds.add(routeId); - } - rpcRegistry.tell(new AddOrUpdateRoutes(routeIds), ActorRef.noSender()); + rpcRegistry.tell(new AddOrUpdateRoutes(rpcs), ActorRef.noSender()); } @Override @@ -58,12 +47,7 @@ final class RpcListener implements DOMRpcAvailabilityListener { Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null."); LOG.debug("Removing registration for [{}]", rpcs); - - final List> routeIds = new ArrayList<>(rpcs.size()); - for (final DOMRpcIdentifier rpc : rpcs) { - routeIds.add(new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference())); - } - rpcRegistry.tell(new RemoveRoutes(routeIds), ActorRef.noSender()); + rpcRegistry.tell(new RemoveRoutes(rpcs), ActorRef.noSender()); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index 0e68eb8709..2f95131757 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -8,38 +8,94 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; +import akka.serialization.JavaSerializer; +import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; -public class RoutingTable implements BucketData, Serializable { +public final class RoutingTable implements BucketData, Serializable { + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") + private Collection rpcs; + private ActorRef rpcInvoker; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final RoutingTable table) { + rpcs = table.getRoutes(); + rpcInvoker = table.getRpcInvoker(); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeObject(Serialization.serializedActorPath(rpcInvoker)); + + final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out); + nnout.writeInt(rpcs.size()); + for (DOMRpcIdentifier id : rpcs) { + nnout.writeSchemaPath(id.getType()); + nnout.writeYangInstanceIdentifier(id.getContextReference()); + } + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject()); + + final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in); + final int size = nnin.readInt(); + rpcs = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + rpcs.add(DOMRpcIdentifier.create(nnin.readSchemaPath(), nnin.readYangInstanceIdentifier())); + } + } + + private Object readResolve() { + return new RoutingTable(rpcInvoker, rpcs); + } + } + private static final long serialVersionUID = 1L; - private final Set> rpcs; + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") + private final Set rpcs; private final ActorRef rpcInvoker; - private RoutingTable(final ActorRef rpcInvoker, final Set> table) { + RoutingTable(final ActorRef rpcInvoker, final Collection table) { this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker); this.rpcs = ImmutableSet.copyOf(table); } - RoutingTable(final ActorRef rpcInvoker) { - this(rpcInvoker, ImmutableSet.of()); - } - @Override public Optional getWatchActor() { return Optional.of(rpcInvoker); } - public Set> getRoutes() { + public Set getRoutes() { return rpcs; } @@ -47,20 +103,24 @@ public class RoutingTable implements BucketData, Serializable { return rpcInvoker; } - RoutingTable addRpcs(final Collection> toAdd) { - final Set> newRpcs = new HashSet<>(rpcs); + RoutingTable addRpcs(final Collection toAdd) { + final Set newRpcs = new HashSet<>(rpcs); newRpcs.addAll(toAdd); return new RoutingTable(rpcInvoker, newRpcs); } - RoutingTable removeRpcs(final Collection> toRemove) { - final Set> newRpcs = new HashSet<>(rpcs); + RoutingTable removeRpcs(final Collection toRemove) { + final Set newRpcs = new HashSet<>(rpcs); newRpcs.removeAll(toRemove); return new RoutingTable(rpcInvoker, newRpcs); } + private Object writeReplace() { + return new Proxy(this); + } + @VisibleForTesting - boolean contains(final RouteIdentifier routeId) { + boolean contains(final DOMRpcIdentifier routeId) { return rpcs.contains(routeId); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 54f7613209..805e47a0dd 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -11,9 +11,9 @@ import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -23,27 +23,24 @@ import java.util.Optional; import java.util.Set; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; /** * Registry to look up cluster nodes that have registered for a given RPC. * *

- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this + * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this * cluster wide information. */ -public class RpcRegistry extends BucketStore { +public class RpcRegistry extends BucketStoreActor { private final ActorRef rpcRegistrar; public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { - super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker)); + super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); } @@ -98,16 +95,7 @@ public class RpcRegistry extends BucketStore { for (Entry> e : buckets.entrySet()) { final RoutingTable table = e.getValue().getData(); - final List rpcs = new ArrayList<>(table.getRoutes().size()); - for (RouteIdentifier ri : table.getRoutes()) { - if (ri instanceof RouteIdentifierImpl) { - final RouteIdentifierImpl id = (RouteIdentifierImpl) ri; - rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute())); - } else { - LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey()); - } - } - + final Collection rpcs = table.getRoutes(); endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs))); } @@ -140,15 +128,15 @@ public class RpcRegistry extends BucketStore { */ public static class Messages { abstract static class AbstractRouteMessage { - final List> routeIdentifiers; + final List routeIdentifiers; - AbstractRouteMessage(final List> routeIdentifiers) { + AbstractRouteMessage(final Collection routeIdentifiers) { Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), "Route Identifiers must be supplied"); - this.routeIdentifiers = routeIdentifiers; + this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers); } - List> getRouteIdentifiers() { + List getRouteIdentifiers() { return this.routeIdentifiers; } @@ -159,13 +147,13 @@ public class RpcRegistry extends BucketStore { } public static final class AddOrUpdateRoutes extends AbstractRouteMessage { - public AddOrUpdateRoutes(final List> routeIdentifiers) { + public AddOrUpdateRoutes(final Collection routeIdentifiers) { super(routeIdentifiers); } } public static final class RemoveRoutes extends AbstractRouteMessage { - public RemoveRoutes(final List> routeIdentifiers) { + public RemoveRoutes(final Collection routeIdentifiers) { super(routeIdentifiers); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index ade614b8d7..daf945289c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -12,23 +12,21 @@ import com.google.common.base.Verify; import java.io.Serializable; final class BucketImpl> implements Bucket, Serializable { - private static final long serialVersionUID = 294779770032719196L; + private static final long serialVersionUID = 1L; - // Guaranteed to be non-null. - // This is kept a Long for binary compatibility of serialization format. - private final Long version; + private final long version; // Guaranteed to be non-null private final T data; - BucketImpl(final Long version, final T data) { - this.version = Preconditions.checkNotNull(version); + BucketImpl(final long version, final T data) { + this.version = version; this.data = Preconditions.checkNotNull(data); } @Override public long getVersion() { - return version.longValue(); + return version; } @Override @@ -42,7 +40,6 @@ final class BucketImpl> implements Bucket, Serializab } private Object readResolve() { - Verify.verifyNotNull(version); Verify.verifyNotNull(data); return this; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java new file mode 100644 index 0000000000..7bea1a0b0b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage; + +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.Address; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Map; +import java.util.function.Consumer; + +/** + * Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}. + * + * @author Robert Varga + */ +@Beta +@VisibleForTesting +public final class BucketStoreAccess { + private final ActorContext context; + private final Timeout timeout; + + BucketStoreAccess(final ActorContext context, final Timeout timeout) { + this.context = Preconditions.checkNotNull(context); + this.timeout = Preconditions.checkNotNull(timeout); + } + + > void getBucketsByMembers(final Collection

members, + final Consumer>> callback) { + Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout) + .onComplete(new OnComplete() { + @SuppressWarnings("unchecked") + @Override + public void onComplete(final Throwable failure, final Object success) { + if (failure == null) { + callback.accept((Map>) success); + } + } + }, context.dispatcher()); + } + + void getBucketVersions(final Consumer> callback) { + Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete() { + @SuppressWarnings("unchecked") + @Override + public void onComplete(final Throwable failure, final Object success) { + if (failure == null) { + callback.accept((Map) success); + } + } + }, context.dispatcher()); + } + + @SuppressWarnings("unchecked") + void updateRemoteBuckets(final Map> buckets) { + context.parent().tell(updateRemoteBucketsMessage((Map>) buckets), ActorRef.noSender()); + } + + void removeRemoteBucket(final Address addr) { + context.parent().tell(removeBucketMessage(addr), ActorRef.noSender()); + } + + @VisibleForTesting + public enum Singletons { + // Sent from Gossiper to BucketStore, response is an immutable Map> + GET_ALL_BUCKETS, + // Sent from Gossiper to BucketStore, response is an immutable Map + GET_BUCKET_VERSIONS, + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java similarity index 76% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java index 70f4053723..6fbe6ceb4d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java @@ -8,7 +8,8 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; import akka.actor.ActorRef; import akka.actor.ActorRefProvider; @@ -27,21 +28,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.SetMultimap; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; +import java.util.function.Consumer; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; /** * A store that syncs its data across nodes in the cluster. @@ -51,9 +47,15 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto *

* Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol). * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. - * */ -public class BucketStore> extends AbstractUntypedPersistentActorWithMetering { +public abstract class BucketStoreActor> extends + AbstractUntypedPersistentActorWithMetering { + // Internal marker interface for messages which are just bridges to execute a method + @FunctionalInterface + private interface ExecuteInActor extends Consumer> { + + } + /** * Buckets owned by other known nodes in the cluster. */ @@ -86,14 +88,38 @@ public class BucketStore> extends AbstractUntypedPersist private Integer incarnation; private boolean persisting; - public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { + protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { this.config = Preconditions.checkNotNull(config); this.initialData = Preconditions.checkNotNull(initialData); this.persistenceId = Preconditions.checkNotNull(persistenceId); } + static ExecuteInActor getBucketsByMembersMessage(final Collection

members) { + return actor -> actor.getBucketsByMembers(members); + } + + static ExecuteInActor removeBucketMessage(final Address addr) { + return actor -> actor.removeBucket(addr); + } + + static ExecuteInActor updateRemoteBucketsMessage(final Map> buckets) { + return actor -> actor.updateRemoteBuckets(buckets); + } + + public final T getLocalData() { + return getLocalBucket().getData(); + } + + public final Map> getRemoteBuckets() { + return remoteBuckets; + } + + public final Map getVersions() { + return versions; + } + @Override - public String persistenceId() { + public final String persistenceId() { return persistenceId; } @@ -107,12 +133,11 @@ public class BucketStore> extends AbstractUntypedPersist } } - @SuppressWarnings("unchecked") @Override protected void handleCommand(final Object message) throws Exception { - if (message instanceof GetAllBuckets) { + if (GET_ALL_BUCKETS == message) { // GetAllBuckets is used only in testing - receiveGetAllBuckets(); + getSender().tell(getAllBuckets(), self()); return; } @@ -121,14 +146,11 @@ public class BucketStore> extends AbstractUntypedPersist return; } - if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); - } else if (message instanceof GetBucketVersions) { - receiveGetBucketVersions(); - } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); - } else if (message instanceof RemoveRemoteBucket) { - removeBucket(((RemoveRemoteBucket) message).getAddress()); + if (message instanceof ExecuteInActor) { + ((ExecuteInActor) message).accept(this); + } else if (GET_BUCKET_VERSIONS == message) { + // FIXME: do we need to send ourselves? + getSender().tell(ImmutableMap.copyOf(versions), getSelf()); } else if (message instanceof Terminated) { actorTerminated((Terminated) message); } else if (message instanceof DeleteSnapshotsSuccess) { @@ -161,7 +183,7 @@ public class BucketStore> extends AbstractUntypedPersist } @Override - protected void handleRecover(final Object message) throws Exception { + protected final void handleRecover(final Object message) throws Exception { if (message instanceof RecoveryCompleted) { if (incarnation != null) { incarnation = incarnation + 1; @@ -182,25 +204,47 @@ public class BucketStore> extends AbstractUntypedPersist } } - protected RemoteRpcProviderConfig getConfig() { + protected final RemoteRpcProviderConfig getConfig() { return config; } + protected final void updateLocalBucket(final T data) { + final LocalBucket local = getLocalBucket(); + final boolean bumpIncarnation = local.setData(data); + versions.put(selfAddress, local.getVersion()); + + if (bumpIncarnation) { + LOG.debug("Version wrapped. incrementing incarnation"); + + Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); + incarnation = incarnation + 1; + + persisting = true; + saveSnapshot(incarnation); + } + } + /** - * Returns all the buckets the this node knows about, self owned + remote. + * Callback to subclasses invoked when a bucket is removed. + * + * @param address Remote address + * @param bucket Bucket removed */ - @VisibleForTesting - protected void receiveGetAllBuckets() { - final ActorRef sender = getSender(); - sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf()); - } + protected abstract void onBucketRemoved(final Address address, final Bucket bucket); + + /** + * Callback to subclasses invoked when the set of remote buckets is updated. + * + * @param newBuckets Map of address to new bucket. Never null, but can be empty. + */ + protected abstract void onBucketsUpdated(final Map> newBuckets); /** * Helper to collect all known buckets. * * @return self owned + remote buckets */ - Map> getAllBuckets() { + private Map> getAllBuckets() { Map> all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket @@ -212,24 +256,12 @@ public class BucketStore> extends AbstractUntypedPersist return all; } - /** - * Returns buckets for requested members that this node knows about. - * - * @param members requested members - */ - void receiveGetBucketsByMembers(final Set
members) { - final ActorRef sender = getSender(); - Map> buckets = getBucketsByMembers(members); - sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf()); - } - /** * Helper to collect buckets for requested members. * * @param members requested members - * @return buckets for requested members */ - Map> getBucketsByMembers(final Set
members) { + private void getBucketsByMembers(final Collection
members) { Map> buckets = new HashMap<>(); //first add the local bucket if asked @@ -244,16 +276,15 @@ public class BucketStore> extends AbstractUntypedPersist } } - return buckets; + getSender().tell(buckets, getSelf()); } - /** - * Returns versions for all buckets known. - */ - void receiveGetBucketVersions() { - final ActorRef sender = getSender(); - GetBucketVersionsReply reply = new GetBucketVersionsReply(versions); - sender.tell(reply, getSelf()); + private void removeBucket(final Address addr) { + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); + onBucketRemoved(addr, bucket); + } } /** @@ -262,7 +293,8 @@ public class BucketStore> extends AbstractUntypedPersist * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ - void receiveUpdateRemoteBuckets(final Map> receivedBuckets) { + @VisibleForTesting + void updateRemoteBuckets(final Map> receivedBuckets) { LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); if (receivedBuckets == null || receivedBuckets.isEmpty()) { //nothing to do @@ -270,7 +302,7 @@ public class BucketStore> extends AbstractUntypedPersist } final Map> newBuckets = new HashMap<>(receivedBuckets.size()); - for (Entry> entry : receivedBuckets.entrySet()) { + for (Entry> entry : receivedBuckets.entrySet()) { final Address addr = entry.getKey(); if (selfAddress.equals(addr)) { @@ -278,7 +310,8 @@ public class BucketStore> extends AbstractUntypedPersist continue; } - final Bucket receivedBucket = entry.getValue(); + @SuppressWarnings("unchecked") + final Bucket receivedBucket = (Bucket) entry.getValue(); if (receivedBucket == null) { LOG.debug("Ignoring null bucket from {}", addr); continue; @@ -328,14 +361,6 @@ public class BucketStore> extends AbstractUntypedPersist } } - private void removeBucket(final Address addr) { - final Bucket bucket = remoteBuckets.remove(addr); - if (bucket != null) { - bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); - onBucketRemoved(addr, bucket); - } - } - private void actorTerminated(final Terminated message) { LOG.info("Actor termination {} received", message); @@ -349,60 +374,13 @@ public class BucketStore> extends AbstractUntypedPersist } } - /** - * Callback to subclasses invoked when a bucket is removed. - * - * @param address Remote address - * @param bucket Bucket removed - */ - protected void onBucketRemoved(final Address address, final Bucket bucket) { - // Default noop - } - - /** - * Callback to subclasses invoked when the set of remote buckets is updated. - * - * @param newBuckets Map of address to new bucket. Never null, but can be empty. - */ - protected void onBucketsUpdated(final Map> newBuckets) { - // Default noop - } - @VisibleForTesting protected boolean isPersisting() { return persisting; } - public T getLocalData() { - return getLocalBucket().getData(); - } - private LocalBucket getLocalBucket() { Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); return localBucket; } - - protected void updateLocalBucket(final T data) { - final LocalBucket local = getLocalBucket(); - final boolean bumpIncarnation = local.setData(data); - versions.put(selfAddress, local.getVersion()); - - if (bumpIncarnation) { - LOG.debug("Version wrapped. incrementing incarnation"); - - Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); - incarnation = incarnation + 1; - - persisting = true; - saveSnapshot(incarnation); - } - } - - public Map> getRemoteBuckets() { - return remoteBuckets; - } - - public Map getVersions() { - return versions; - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java new file mode 100644 index 0000000000..a6a0c2b6f7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipEnvelope.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.Address; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.util.Map; + +final class GossipEnvelope implements Serializable { + private static final long serialVersionUID = 1L; + + private final Map> buckets; + private final Address from; + private final Address to; + + GossipEnvelope(final Address from, final Address to, final Map> buckets) { + this.to = Preconditions.checkNotNull(to); + this.buckets = ImmutableMap.copyOf(buckets); + this.from = from; + } + + Map> buckets() { + return buckets; + } + + Address from() { + return from; + } + + Address to() { + return to; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java new file mode 100644 index 0000000000..db62185dd1 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossipStatus.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.Address; +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.util.Map; + +final class GossipStatus implements Serializable { + private static final long serialVersionUID = 1L; + + private final Map versions; + private final Address from; + + GossipStatus(final Address from, final Map versions) { + this.versions = ImmutableMap.copyOf(versions); + this.from = from; + } + + Address from() { + return from; + } + + Map versions() { + return versions; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 2c47c4e2a9..015c0a1b0e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -17,8 +17,6 @@ import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; -import akka.dispatch.Mapper; -import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; @@ -27,21 +25,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; /** @@ -63,6 +52,13 @@ import scala.concurrent.duration.FiniteDuration; * for update. */ public class Gossiper extends AbstractUntypedActorWithMetering { + private static final Object GOSSIP_TICK = new Object() { + @Override + public String toString() { + return "gossip tick"; + } + }; + private final boolean autoStartGossipTicks; private final RemoteRpcProviderConfig config; @@ -85,6 +81,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private Cancellable gossipTask; + private BucketStoreAccess bucketStore; + Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { this.config = Preconditions.checkNotNull(config); this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); @@ -107,6 +105,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); + bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration()); + if (provider instanceof ClusterActorRefProvider ) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), @@ -121,7 +121,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { new FiniteDuration(1, TimeUnit.SECONDS), //initial delay config.getGossipTickInterval(), //interval getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message + GOSSIP_TICK, //message getContext().dispatcher(), //execution context getSelf() //sender ); @@ -138,12 +138,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected void handleReceive(final Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) { + if (GOSSIP_TICK.equals(message)) { receiveGossipTick(); } else if (message instanceof GossipStatus) { // Message from remote gossiper with its bucket versions @@ -197,7 +196,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private void removePeer(final Address address) { clusterMembers.remove(address); peers.remove(address); - getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); + bucketStore.removeRemoteBucket(address); } /** @@ -258,15 +257,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @VisibleForTesting void receiveGossipStatus(final GossipStatus status) { // Don't accept messages from non-members - if (!peers.containsKey(status.from())) { - return; + if (peers.containsKey(status.from())) { + // FIXME: sender should be part of GossipStatus + final ActorRef sender = getSender(); + bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions)); + } + } + + private void processRemoteStatus(final ActorRef remote, final GossipStatus status, + final Map localVersions) { + final Map remoteVersions = status.versions(); + + //diff between remote list and local + final Set
localIsOlder = new HashSet<>(remoteVersions.keySet()); + localIsOlder.removeAll(localVersions.keySet()); + + //diff between local list and remote + final Set
localIsNewer = new HashSet<>(localVersions.keySet()); + localIsNewer.removeAll(remoteVersions.keySet()); + + + for (Entry entry : remoteVersions.entrySet()) { + Address address = entry.getKey(); + Long remoteVersion = entry.getValue(); + Long localVersion = localVersions.get(address); + if (localVersion == null || remoteVersion == null) { + //this condition is taken care of by above diffs + continue; + } + + if (localVersion < remoteVersion) { + localIsOlder.add(address); + } else if (localVersion > remoteVersion) { + localIsNewer.add(address); + } } - final ActorRef sender = getSender(); - Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); + if (!localIsOlder.isEmpty()) { + remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); + } - futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); + if (!localIsNewer.isEmpty()) { + //send newer buckets to remote + bucketStore.getBucketsByMembers(localIsNewer, buckets -> { + LOG.trace("Buckets to send from {}: {}", selfAddress, buckets); + remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf()); + }); + } } /** @@ -275,14 +312,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param envelope contains buckets from a remote gossiper */ @VisibleForTesting - > void receiveGossip(final GossipEnvelope envelope) { + void receiveGossip(final GossipEnvelope envelope) { //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); return; } - updateRemoteBuckets(envelope.getBuckets()); + updateRemoteBuckets(envelope.buckets()); } /** @@ -291,21 +328,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param buckets map of Buckets to update */ @VisibleForTesting - > void updateRemoteBuckets(final Map> buckets) { - getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf()); - } - - /** - * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper. - * - * @param remote remote node to send Buckets to - * @param addresses node addresses whose buckets needs to be sent - */ - void sendGossipTo(final ActorRef remote, final Set
addresses) { - - Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration()); - futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); + void updateRemoteBuckets(final Map> buckets) { + bucketStore.updateRemoteBuckets(buckets); } /** @@ -315,133 +339,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { - - //Get local status from bucket store and send to remote - Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); - - LOG.trace("Sending bucket versions to [{}]", remoteGossiper); - - futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher()); - } - - /// - /// Private factories to create mappers - /// - - private Mapper getMapperToSendLocalStatus(final ActorSelection remote) { - - return new Mapper() { - @Override - public Void apply(final Object replyMessage) { - if (replyMessage instanceof GetBucketVersionsReply) { - GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; - Map localVersions = reply.getVersions(); - - remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); - } - return null; - } - }; - } - - /** - * Process bucket versions received from - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}. - * Then this method compares remote bucket versions with local bucket versions. - *
    - *
  • The buckets that are newer locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} - * to remote - *
  • The buckets that are older locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} - * to remote so that remote sends GossipEnvelop. - *
- * - * @param sender the remote member - * @param status bucket versions from a remote member - * @return a {@link akka.dispatch.Mapper} that gets evaluated in future - * - */ - private Mapper getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) { - - final Map remoteVersions = status.getVersions(); - - return new Mapper() { - @Override - public Void apply(final Object replyMessage) { - if (replyMessage instanceof GetBucketVersionsReply) { - GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; - Map localVersions = reply.getVersions(); - - //diff between remote list and local - Set
localIsOlder = new HashSet<>(); - localIsOlder.addAll(remoteVersions.keySet()); - localIsOlder.removeAll(localVersions.keySet()); - - //diff between local list and remote - Set
localIsNewer = new HashSet<>(); - localIsNewer.addAll(localVersions.keySet()); - localIsNewer.removeAll(remoteVersions.keySet()); - - - for (Map.Entry entry : remoteVersions.entrySet()) { - Address address = entry.getKey(); - Long remoteVersion = entry.getValue(); - Long localVersion = localVersions.get(address); - if (localVersion == null || remoteVersion == null) { - //this condition is taken care of by above diffs - continue; - } - - if (localVersion < remoteVersion) { - localIsOlder.add(address); - } else if (localVersion > remoteVersion) { - localIsNewer.add(address); - } - } - - if (!localIsOlder.isEmpty()) { - sender.tell(new GossipStatus(selfAddress, localVersions), getSelf()); - } - - if (!localIsNewer.isEmpty()) { - //send newer buckets to remote - sendGossipTo(sender, localIsNewer); - } - } - return null; - } - }; - } - - /** - * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} - * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. - * These buckets are sent to a remote member encapsulated in - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} - * - * @param sender the remote member that sent - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} - * in reply to which bucket is being sent back - * @return a {@link akka.dispatch.Mapper} that gets evaluated in future - * - */ - private Mapper getMapperToSendGossip(final ActorRef sender) { - - return new Mapper() { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Void apply(final Object msg) { - if (msg instanceof GetBucketsByMembersReply) { - Map> buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - LOG.trace("Buckets to send from {}: {}", selfAddress, buckets); - GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); - sender.tell(envelope, getSelf()); - } - return null; - } - }; + bucketStore.getBucketVersions(versions -> { + LOG.trace("Sending bucket versions to [{}]", remoteGossiper); + /* + * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring, + * but can we identify which bucket is the local one? + */ + remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf()); + }); } /// diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java index b2c5e0c833..e5a2a54883 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java @@ -23,7 +23,7 @@ final class LocalBucket> { * * We are keeping a boxed version here, as we stick it into a map anyway. */ - private Long version; + private long version; private T data; // We bump versions only if we took a snapshot since last data update @@ -39,7 +39,7 @@ final class LocalBucket> { return data; } - Long getVersion() { + long getVersion() { return version; } @@ -50,15 +50,11 @@ final class LocalBucket> { boolean setData(final T data) { this.data = Preconditions.checkNotNull(data); - if (bumpVersion) { - final long next = version.longValue() + 1; - if ((next & 0xffff_ffffL) == 0) { - return true; - } - - version = next; - bumpVersion = false; + if (!bumpVersion) { + return false; } - return false; + + bumpVersion = false; + return (++version & 0xffff_ffffL) == 0; } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java deleted file mode 100644 index 361f5b7e1c..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.registry.gossip; - -import akka.actor.Address; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; - -/** - * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors. - */ -public class Messages { - - public static class BucketStoreMessages { - - public static final class GetAllBuckets implements Serializable { - private static final long serialVersionUID = 1L; - } - - public static final class GetBucketsByMembers implements Serializable { - private static final long serialVersionUID = 1L; - private final Set
members; - - public GetBucketsByMembers(final Set
members) { - Preconditions.checkArgument(members != null, "members can not be null"); - this.members = ImmutableSet.copyOf(members); - } - - public Set
getMembers() { - return members; - } - } - - public static class ContainsBuckets> implements Serializable { - private static final long serialVersionUID = -4940160367495308286L; - - private final Map> buckets; - - protected ContainsBuckets(final Map> buckets) { - Preconditions.checkArgument(buckets != null, "buckets can not be null"); - this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets)); - } - - public final Map> getBuckets() { - return buckets; - } - } - - public static final class GetAllBucketsReply> extends ContainsBuckets { - private static final long serialVersionUID = 1L; - - public GetAllBucketsReply(final Map> buckets) { - super(buckets); - } - } - - public static final class GetBucketsByMembersReply> extends ContainsBuckets { - private static final long serialVersionUID = 1L; - - public GetBucketsByMembersReply(final Map> buckets) { - super(buckets); - } - } - - public static final class GetBucketVersions implements Serializable { - private static final long serialVersionUID = 1L; - } - - public static class ContainsBucketVersions implements Serializable { - private static final long serialVersionUID = -8172148925383801613L; - - Map versions; - - public ContainsBucketVersions(final Map versions) { - Preconditions.checkArgument(versions != null, "versions can not be null or empty"); - - this.versions = ImmutableMap.copyOf(versions); - } - - public Map getVersions() { - return versions; - } - } - - public static final class GetBucketVersionsReply extends ContainsBucketVersions { - private static final long serialVersionUID = 1L; - - public GetBucketVersionsReply(final Map versions) { - super(versions); - } - } - - public static final class UpdateRemoteBuckets> extends ContainsBuckets { - private static final long serialVersionUID = 1L; - - public UpdateRemoteBuckets(final Map> buckets) { - super(buckets); - } - } - - /** - * Message sent from the gossiper to its parent, therefore not Serializable, requesting removal - * of a bucket corresponding to an address. - */ - public static final class RemoveRemoteBucket { - private final Address address; - - public RemoveRemoteBucket(final Address address) { - this.address = Preconditions.checkNotNull(address); - } - - public Address getAddress() { - return address; - } - } - } - - public static class GossiperMessages { - public static class Tick implements Serializable { - private static final long serialVersionUID = -4770935099506366773L; - } - - public static final class GossipTick extends Tick { - private static final long serialVersionUID = 5803354404380026143L; - } - - public static final class GossipStatus extends ContainsBucketVersions { - private static final long serialVersionUID = -593037395143883265L; - - private final Address from; - - public GossipStatus(final Address from, final Map versions) { - super(versions); - this.from = from; - } - - public Address from() { - return from; - } - } - - public static final class GossipEnvelope> extends ContainsBuckets { - private static final long serialVersionUID = 8346634072582438818L; - - private final Address from; - private final Address to; - - public GossipEnvelope(final Address from, final Address to, final Map> buckets) { - super(buckets); - Preconditions.checkArgument(to != null, "Recipient of message must not be null"); - this.to = to; - this.from = from; - } - - public Address from() { - return from; - } - - public Address to() { - return to; - } - } - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java index 339d4b1a32..eabb170db2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java @@ -15,10 +15,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.registry.RoutingTable; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +27,6 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot protected final Logger log = LoggerFactory.getLogger(getClass()); - private static final String NULL_CONSTANT = "null"; - private static final String LOCAL_CONSTANT = "local"; private static final String ROUTE_CONSTANT = "route:"; @@ -47,9 +45,9 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot public Set getGlobalRpc() { RoutingTable table = rpcRegistry.getLocalData(); Set globalRpc = new HashSet<>(table.getRoutes().size()); - for (RpcRouter.RouteIdentifier route : table.getRoutes()) { - if (route.getRoute() == null) { - globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT); + for (DOMRpcIdentifier route : table.getRoutes()) { + if (route.getContextReference().isEmpty()) { + globalRpc.add(route.getType().toString()); } } @@ -61,11 +59,10 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot public Set getLocalRegisteredRoutedRpc() { RoutingTable table = rpcRegistry.getLocalData(); Set routedRpc = new HashSet<>(table.getRoutes().size()); - for (RpcRouter.RouteIdentifier route : table.getRoutes()) { - if (route.getRoute() != null) { + for (DOMRpcIdentifier route : table.getRoutes()) { + if (!route.getContextReference().isEmpty()) { StringBuilder builder = new StringBuilder(ROUTE_CONSTANT); - builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null - ? route.getType().toString() : NULL_CONSTANT); + builder.append(route.getContextReference().toString()).append(NAME_CONSTANT).append(route.getType()); routedRpc.add(builder.toString()); } } @@ -111,15 +108,14 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot */ private static Map getRpcMemberMapByRoute(final RoutingTable table, final String routeName, final String address) { - Set> routes = table.getRoutes(); + Set routes = table.getRoutes(); Map rpcMap = new HashMap<>(routes.size()); - for (RpcRouter.RouteIdentifier route : table.getRoutes()) { - if (route.getRoute() != null) { - String routeString = route.getRoute().toString(); + for (DOMRpcIdentifier route : routes) { + if (!route.getContextReference().isEmpty()) { + String routeString = route.getContextReference().toString(); if (routeString.contains(routeName)) { StringBuilder builder = new StringBuilder(ROUTE_CONSTANT); - builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null - ? route.getType().toString() : NULL_CONSTANT); + builder.append(routeString).append(NAME_CONSTANT).append(route.getType()); rpcMap.put(builder.toString(), address); } } @@ -132,15 +128,14 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot */ private static Map getRpcMemberMapByName(final RoutingTable table, final String name, final String address) { - Set> routes = table.getRoutes(); + Set routes = table.getRoutes(); Map rpcMap = new HashMap<>(routes.size()); - for (RpcRouter.RouteIdentifier route : routes) { - if (route.getType() != null) { + for (DOMRpcIdentifier route : routes) { + if (!route.getContextReference().isEmpty()) { String type = route.getType().toString(); if (type.contains(name)) { StringBuilder builder = new StringBuilder(ROUTE_CONSTANT); - builder.append(route.getRoute() != null ? route.getRoute().toString() : NULL_CONSTANT) - .append(NAME_CONSTANT).append(type); + builder.append(route.getContextReference()).append(NAME_CONSTANT).append(type); rpcMap.put(builder.toString(), address); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java deleted file mode 100644 index 462a514ac9..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/utils/ConditionalProbe.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.utils; - -import akka.actor.ActorRef; -import com.google.common.base.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConditionalProbe { - private final ActorRef actorRef; - private final Predicate predicate; - Logger log = LoggerFactory.getLogger(ConditionalProbe.class); - - public ConditionalProbe(ActorRef actorRef, Predicate predicate) { - this.actorRef = actorRef; - this.predicate = predicate; - } - - public void tell(Object message, ActorRef sender) { - if (predicate.apply(message)) { - log.info("sending message to probe {}", message); - actorRef.tell(message, sender); - } - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index 0c3a57e8c9..84062735c2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.remote.rpc.registry; import static org.junit.Assert.fail; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -27,6 +29,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -40,20 +43,15 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -183,7 +181,7 @@ public class RpcRegistryTest { // Add rpc on node 1 - List> addedRouteIds = createRouteIds(); + List addedRouteIds = createRouteIds(); registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender()); @@ -217,7 +215,7 @@ public class RpcRegistryTest { LOG.info("testRpcAddRemoveInCluster starting"); - List> addedRouteIds = createRouteIds(); + List addedRouteIds = createRouteIds(); Address node1Address = node1.provider().getDefaultAddress(); @@ -249,7 +247,7 @@ public class RpcRegistryTest { buckets = retrieveBuckets(registry1, testKit, address); try { - verifyBucket(buckets.get(address), Collections.>emptyList()); + verifyBucket(buckets.get(address), Collections.emptyList()); break; } catch (AssertionError e) { if (++numTries >= 50) { @@ -269,14 +267,14 @@ public class RpcRegistryTest { final JavaTestKit testKit = new JavaTestKit(node3); // Add rpc on node 1 - List> addedRouteIds1 = createRouteIds(); + List addedRouteIds1 = createRouteIds(); registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender()); final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), UpdateRemoteEndpoints.class); // Add rpc on node 2 - List> addedRouteIds2 = createRouteIds(); + List addedRouteIds2 = createRouteIds(); registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender()); final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), @@ -320,17 +318,16 @@ public class RpcRegistryTest { } private static Map retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) { - bucketStore.tell(new GetBucketVersions(), testKit.getRef()); - GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), - GetBucketVersionsReply.class); - return reply.getVersions(); + bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef()); + @SuppressWarnings("unchecked") + final Map reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class); + return reply; } - private static void verifyBucket(final Bucket bucket, - final List> expRouteIds) { + private static void verifyBucket(final Bucket bucket, final List expRouteIds) { RoutingTable table = bucket.getData(); Assert.assertNotNull("Bucket RoutingTable is null", table); - for (RouteIdentifier r : expRouteIds) { + for (DOMRpcIdentifier r : expRouteIds) { if (!table.contains(r)) { Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table); } @@ -343,12 +340,11 @@ public class RpcRegistryTest { final JavaTestKit testKit, final Address... addresses) { int numTries = 0; while (true) { - bucketStore.tell(new GetAllBuckets(), testKit.getRef()); + bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef()); @SuppressWarnings("unchecked") - GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), - GetAllBucketsReply.class); + Map> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + Map.class); - Map> buckets = reply.getBuckets(); boolean foundAll = true; for (Address addr : addresses) { Bucket bucket = buckets.get(addr); @@ -376,30 +372,29 @@ public class RpcRegistryTest { final JavaTestKit testKit = new JavaTestKit(node1); final int nRoutes = 500; - final RouteIdentifier[] added = new RouteIdentifier[nRoutes]; + final Collection added = new ArrayList<>(nRoutes); for (int i = 0; i < nRoutes; i++) { - final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, - new QName(new URI("/mockrpc"), "type" + i), null); - added[i] = routeId; + final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true, + new QName(new URI("/mockrpc"), "type" + i))); + added.add(routeId); //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - registry1.tell(new AddOrUpdateRoutes(Arrays.>asList(routeId)), + registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)), ActorRef.noSender()); } - GetAllBuckets getAllBuckets = new GetAllBuckets(); FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); int numTries = 0; while (true) { - registry1.tell(getAllBuckets, testKit.getRef()); + registry1.tell(GET_ALL_BUCKETS, testKit.getRef()); @SuppressWarnings("unchecked") - GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class); + Map> buckets = testKit.expectMsgClass(duration, Map.class); - Bucket localBucket = reply.getBuckets().values().iterator().next(); + Bucket localBucket = buckets.values().iterator().next(); RoutingTable table = localBucket.getData(); if (table != null && table.size() == nRoutes) { - for (RouteIdentifier r : added) { - Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r)); + for (DOMRpcIdentifier r : added) { + Assert.assertTrue("RoutingTable contains " + r, table.contains(r)); } break; @@ -413,10 +408,10 @@ public class RpcRegistryTest { } } - private List> createRouteIds() throws URISyntaxException { + private List createRouteIds() throws URISyntaxException { QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++); - List> routeIds = new ArrayList<>(1); - routeIds.add(new RouteIdentifierImpl(null, type, null)); + List routeIds = new ArrayList<>(1); + routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type))); return routeIds; } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 59dd29b314..4898b27ec9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -7,39 +7,23 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; -import static akka.actor.ActorRef.noSender; - import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; -import akka.actor.Status.Success; -import akka.pattern.Patterns; -import akka.persistence.SaveSnapshotSuccess; import akka.testkit.JavaTestKit; -import akka.util.Timeout; +import akka.testkit.TestActorRef; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; public class BucketStoreTest { @@ -58,8 +42,6 @@ public class BucketStoreTest { private static ActorSystem system; - private JavaTestKit kit; - @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); @@ -71,74 +53,55 @@ public class BucketStoreTest { JavaTestKit.shutdownActorSystem(system); } - @Before - public void before() { - kit = new JavaTestKit(system); - } - - @After - public void after() { - kit.shutdown(system); - } - /** * Given remote buckets, should merge with local copy of remote buckets. */ @Test - public void testReceiveUpdateRemoteBuckets() throws Exception { + public void testReceiveUpdateRemoteBuckets() { + + final BucketStoreActor store = createStore(); - final ActorRef store = createStore(); Address localAddress = system.provider().getDefaultAddress(); Bucket localBucket = new BucketImpl<>(0L, new T()); - Address a1 = new Address("tcp", "system1"); - Address a2 = new Address("tcp", "system2"); - Address a3 = new Address("tcp", "system3"); + final Address a1 = new Address("tcp", "system1"); + final Address a2 = new Address("tcp", "system2"); + final Address a3 = new Address("tcp", "system3"); - Bucket b1 = new BucketImpl<>(0L, new T()); - Bucket b2 = new BucketImpl<>(0L, new T()); - Bucket b3 = new BucketImpl<>(0L, new T()); - - Map> remoteBuckets = new HashMap<>(3); - remoteBuckets.put(a1, b1); - remoteBuckets.put(a2, b2); - remoteBuckets.put(a3, b3); - remoteBuckets.put(localAddress, localBucket); - - Await.result(Patterns.ask(store, new WaitUntilDonePersisting(), - Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf()); + final Bucket b1 = new BucketImpl<>(0L, new T()); + final Bucket b2 = new BucketImpl<>(0L, new T()); + final Bucket b3 = new BucketImpl<>(0L, new T()); //Given remote buckets - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(ImmutableMap.of(a1, b1, a2, b2, localAddress, localBucket)); - //Should contain local bucket - //Should contain 4 entries i.e a1, a2, a3, local - Map> remoteBucketsInStore = getBuckets(store); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + //Should NOT contain local bucket + //Should contain ONLY 3 entries i.e a1, a2 + Map> remoteBucketsInStore = store.getRemoteBuckets(); + Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); + Assert.assertTrue(remoteBucketsInStore.size() == 2); //Add a new remote bucket Address a4 = new Address("tcp", "system4"); Bucket b4 = new BucketImpl<>(0L, new T()); - remoteBuckets.clear(); - remoteBuckets.put(a4, b4); - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(ImmutableMap.of(a4, b4)); //Should contain a4 - //Should contain 5 entries now i.e a1, a2, a3, a4, local - remoteBucketsInStore = getBuckets(store); + //Should contain 4 entries now i.e a1, a2, a4 + remoteBucketsInStore = store.getRemoteBuckets(); Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4)); - Assert.assertTrue(remoteBucketsInStore.size() == 5); + Assert.assertTrue(remoteBucketsInStore.size() == 3); //Update a bucket Bucket b3New = new BucketImpl<>(0L, new T()); - remoteBuckets.clear(); + Map> remoteBuckets = new HashMap<>(3); remoteBuckets.put(a3, b3New); remoteBuckets.put(a1, null); remoteBuckets.put(a2, null); - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(remoteBuckets); //Should only update a3 - remoteBucketsInStore = getBuckets(store); + remoteBucketsInStore = store.getRemoteBuckets(); Bucket b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion()); @@ -147,11 +110,11 @@ public class BucketStoreTest { Bucket b2InStore = remoteBucketsInStore.get(a2); Assert.assertEquals(b1.getVersion(), b1InStore.getVersion()); Assert.assertEquals(b2.getVersion(), b2InStore.getVersion()); - Assert.assertTrue(remoteBucketsInStore.size() == 5); + Assert.assertTrue(remoteBucketsInStore.size() == 4); //Should update versions map //versions map contains versions for all remote buckets (4). - Map versionsInStore = getVersions(store); + Map versionsInStore = store.getVersions(); Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2)); @@ -161,12 +124,13 @@ public class BucketStoreTest { //Send older version of bucket remoteBuckets.clear(); remoteBuckets.put(a3, b3); - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(remoteBuckets); //Should NOT update a3 - remoteBucketsInStore = getBuckets(store); + remoteBucketsInStore = store.getRemoteBuckets(); b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion()); + } /** @@ -174,63 +138,28 @@ public class BucketStoreTest { * * @return instance of BucketStore class */ - private ActorRef createStore() { - return kit.childActorOf(Props.create(TestingBucketStore.class, - new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T())); - } - - @SuppressWarnings("unchecked") - private static Map> getBuckets(final ActorRef store) throws Exception { - final GetAllBucketsReply result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(), - Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf()); - return result.getBuckets(); + private static BucketStoreActor createStore() { + final Props props = Props.create(TestingBucketStoreActor.class, + new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T()); + return TestActorRef.>create(system, props, "testStore").underlyingActor(); } - @SuppressWarnings("unchecked") - private static Map getVersions(final ActorRef store) throws Exception { - return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(), - Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions(); - } + private static final class TestingBucketStoreActor extends BucketStoreActor { - private static final class TestingBucketStore extends BucketStore { - - private final List toNotify = new ArrayList<>(); - - TestingBucketStore(final RemoteRpcProviderConfig config, - final String persistenceId, - final T initialData) { + protected TestingBucketStoreActor(final RemoteRpcProviderConfig config, + final String persistenceId, + final T initialData) { super(config, persistenceId, initialData); } @Override - protected void handleCommand(Object message) throws Exception { - if (message instanceof WaitUntilDonePersisting) { - handlePersistAsk(); - } else if (message instanceof SaveSnapshotSuccess) { - super.handleCommand(message); - handleSnapshotSuccess(); - } else { - super.handleCommand(message); - } - } + protected void onBucketRemoved(final Address address, final Bucket bucket) { - private void handlePersistAsk() { - if (isPersisting()) { - toNotify.add(getSender()); - } else { - getSender().tell(new Success(null), noSender()); - } } - private void handleSnapshotSuccess() { - toNotify.forEach(ref -> ref.tell(new Success(null), noSender())); - } - } - - /** - * Message sent to the TestingBucketStore that replies with success once the actor is done persisting. - */ - private static final class WaitUntilDonePersisting { + @Override + protected void onBucketsUpdated(final Map> newBuckets) { + } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java index 9fccb06943..f1e7fea6f5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java @@ -31,8 +31,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; public class GossiperTest { @@ -63,7 +61,6 @@ public class GossiperTest { @After public void resetMocks() { reset(mockGossiper); - } @Test @@ -85,7 +82,6 @@ public class GossiperTest { @SuppressWarnings("unchecked") @Test public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore() { - Address nonMember = new Address("tcp", "non-member"); GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class)); @@ -95,14 +91,12 @@ public class GossiperTest { verify(mockGossiper, times(0)).getSender(); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings("unchecked") @Test public void testReceiveGossipWhenNotAddressedToSelfShouldIgnore() { - Address notSelf = new Address("tcp", "not-self"); - - GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class)); doNothing().when(mockGossiper).updateRemoteBuckets(anyMap()); - mockGossiper.receiveGossip(envelope); + Address notSelf = new Address("tcp", "not-self"); + mockGossiper.receiveGossip(new GossipEnvelope(notSelf, notSelf, mock(Map.class))); verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap()); } @@ -112,7 +106,10 @@ public class GossiperTest { * @return instance of Gossiper class */ private static Gossiper createGossiper() { - final Props props = Gossiper.testProps(new RemoteRpcProviderConfig(system.settings().config())); + final RemoteRpcProviderConfig config = + new RemoteRpcProviderConfig.Builder("unit-test") + .withConfigReader(ConfigFactory::load).build(); + final Props props = Gossiper.testProps(config); final TestActorRef testRef = TestActorRef.create(system, props, "testGossiper"); return testRef.underlyingActor(); -- 2.36.6