<artifactId>akka-persistence_${scala.version}</artifactId>
</dependency>
<!-- SAL Dependencies -->
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-connector-api</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-common-util</artifactId>
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class RouteIdentifierImpl
- implements RpcRouter.RouteIdentifier<QName, QName, YangInstanceIdentifier>, Serializable {
- private static final long serialVersionUID = 1L;
-
- private final QName context;
- private final QName type;
- private final YangInstanceIdentifier route;
-
- public RouteIdentifierImpl(final QName context, final QName type, final YangInstanceIdentifier route) {
- Preconditions.checkNotNull(type, "Rpc type should not be null");
- this.context = context;
- this.type = type;
- this.route = route;
- }
-
- @Override
- public QName getContext() {
- return context;
- }
-
- @Override
- public QName getType() {
- return type;
- }
-
- @Override
- public YangInstanceIdentifier getRoute() {
- return route;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
-
- final RouteIdentifierImpl that = (RouteIdentifierImpl) obj;
-
- if (context == null) {
- if (that.getContext() != null) {
- return false;
- }
- } else if (!context.equals(that.context)) {
- return false;
- }
-
- if (route == null) {
- if (that.getRoute() != null) {
- return false;
- }
- } else if (!route.equals(that.route)) {
- return false;
- }
-
- if (type == null) {
- if (that.getType() != null) {
- return false;
- }
- } else if (!type.equals(that.type)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 0;
- result = prime * result + (context == null ? 0 : context.hashCode());
- result = prime * result + (type == null ? 0 : type.hashCode());
- result = prime * result + (route == null ? 0 : route.hashCode());
- return result;
- }
-
- @Override
- public String toString() {
- return "RouteIdentifierImpl{" + "context=" + context + ", type=" + type + ", route=" + route + '}';
- }
-}
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
LOG.debug("Adding registration for [{}]", rpcs);
- final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
-
- for (final DOMRpcIdentifier rpc : rpcs) {
- // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
- final RouteIdentifier<?,?,?> routeId =
- new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
- routeIds.add(routeId);
- }
- rpcRegistry.tell(new AddOrUpdateRoutes(routeIds), ActorRef.noSender());
+ rpcRegistry.tell(new AddOrUpdateRoutes(rpcs), ActorRef.noSender());
}
@Override
Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
LOG.debug("Removing registration for [{}]", rpcs);
-
- final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
- for (final DOMRpcIdentifier rpc : rpcs) {
- routeIds.add(new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()));
- }
- rpcRegistry.tell(new RemoveRoutes(routeIds), ActorRef.noSender());
+ rpcRegistry.tell(new RemoveRoutes(rpcs), ActorRef.noSender());
}
@Override
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-public class RoutingTable implements BucketData<RoutingTable>, Serializable {
+public final class RoutingTable implements BucketData<RoutingTable>, Serializable {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+ private Collection<DOMRpcIdentifier> rpcs;
+ private ActorRef rpcInvoker;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final RoutingTable table) {
+ rpcs = table.getRoutes();
+ rpcInvoker = table.getRpcInvoker();
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeObject(Serialization.serializedActorPath(rpcInvoker));
+
+ final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
+ nnout.writeInt(rpcs.size());
+ for (DOMRpcIdentifier id : rpcs) {
+ nnout.writeSchemaPath(id.getType());
+ nnout.writeYangInstanceIdentifier(id.getContextReference());
+ }
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+
+ final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
+ final int size = nnin.readInt();
+ rpcs = new ArrayList<>(size);
+ for (int i = 0; i < size; ++i) {
+ rpcs.add(DOMRpcIdentifier.create(nnin.readSchemaPath(), nnin.readYangInstanceIdentifier()));
+ }
+ }
+
+ private Object readResolve() {
+ return new RoutingTable(rpcInvoker, rpcs);
+ }
+ }
+
private static final long serialVersionUID = 1L;
- private final Set<RouteIdentifier<?, ?, ?>> rpcs;
+ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+ private final Set<DOMRpcIdentifier> rpcs;
private final ActorRef rpcInvoker;
- private RoutingTable(final ActorRef rpcInvoker, final Set<RouteIdentifier<?, ?, ?>> table) {
+ RoutingTable(final ActorRef rpcInvoker, final Collection<DOMRpcIdentifier> table) {
this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker);
this.rpcs = ImmutableSet.copyOf(table);
}
- RoutingTable(final ActorRef rpcInvoker) {
- this(rpcInvoker, ImmutableSet.of());
- }
-
@Override
public Optional<ActorRef> getWatchActor() {
return Optional.of(rpcInvoker);
}
- public Set<RouteIdentifier<?, ?, ?>> getRoutes() {
+ public Set<DOMRpcIdentifier> getRoutes() {
return rpcs;
}
return rpcInvoker;
}
- RoutingTable addRpcs(final Collection<RouteIdentifier<?, ?, ?>> toAdd) {
- final Set<RouteIdentifier<?, ?, ?>> newRpcs = new HashSet<>(rpcs);
+ RoutingTable addRpcs(final Collection<DOMRpcIdentifier> toAdd) {
+ final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
newRpcs.addAll(toAdd);
return new RoutingTable(rpcInvoker, newRpcs);
}
- RoutingTable removeRpcs(final Collection<RouteIdentifier<?, ?, ?>> toRemove) {
- final Set<RouteIdentifier<?, ?, ?>> newRpcs = new HashSet<>(rpcs);
+ RoutingTable removeRpcs(final Collection<DOMRpcIdentifier> toRemove) {
+ final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
newRpcs.removeAll(toRemove);
return new RoutingTable(rpcInvoker, newRpcs);
}
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
@VisibleForTesting
- boolean contains(final RouteIdentifier<?, ?, ?> routeId) {
+ boolean contains(final DOMRpcIdentifier routeId) {
return rpcs.contains(routeId);
}
import akka.actor.Address;
import akka.actor.Props;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
/**
* Registry to look up cluster nodes that have registered for a given RPC.
*
* <p>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends BucketStore<RoutingTable> {
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
private final ActorRef rpcRegistrar;
public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
- super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker));
+ super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
}
for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
final RoutingTable table = e.getValue().getData();
- final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
- for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
- if (ri instanceof RouteIdentifierImpl) {
- final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
- rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
- } else {
- LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
- }
- }
-
+ final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
: Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
}
*/
public static class Messages {
abstract static class AbstractRouteMessage {
- final List<RouteIdentifier<?, ?, ?>> routeIdentifiers;
+ final List<DOMRpcIdentifier> routeIdentifiers;
- AbstractRouteMessage(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
"Route Identifiers must be supplied");
- this.routeIdentifiers = routeIdentifiers;
+ this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
}
- List<RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+ List<DOMRpcIdentifier> getRouteIdentifiers() {
return this.routeIdentifiers;
}
}
public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
- public AddOrUpdateRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
super(routeIdentifiers);
}
}
public static final class RemoveRoutes extends AbstractRouteMessage {
- public RemoveRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
super(routeIdentifiers);
}
}
import java.io.Serializable;
final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
- private static final long serialVersionUID = 294779770032719196L;
+ private static final long serialVersionUID = 1L;
- // Guaranteed to be non-null.
- // This is kept a Long for binary compatibility of serialization format.
- private final Long version;
+ private final long version;
// Guaranteed to be non-null
private final T data;
- BucketImpl(final Long version, final T data) {
- this.version = Preconditions.checkNotNull(version);
+ BucketImpl(final long version, final T data) {
+ this.version = version;
this.data = Preconditions.checkNotNull(data);
}
@Override
public long getVersion() {
- return version.longValue();
+ return version;
}
@Override
}
private Object readResolve() {
- Verify.verifyNotNull(version);
Verify.verifyNotNull(data);
return this;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+@VisibleForTesting
+public final class BucketStoreAccess {
+ private final ActorContext context;
+ private final Timeout timeout;
+
+ BucketStoreAccess(final ActorContext context, final Timeout timeout) {
+ this.context = Preconditions.checkNotNull(context);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
+ <T extends BucketData<T>> void getBucketsByMembers(final Collection<Address> members,
+ final Consumer<Map<Address, Bucket<T>>> callback) {
+ Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout)
+ .onComplete(new OnComplete<Object>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ if (failure == null) {
+ callback.accept((Map<Address, Bucket<T>>) success);
+ }
+ }
+ }, context.dispatcher());
+ }
+
+ void getBucketVersions(final Consumer<Map<Address, Long>> callback) {
+ Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ if (failure == null) {
+ callback.accept((Map<Address, Long>) success);
+ }
+ }
+ }, context.dispatcher());
+ }
+
+ @SuppressWarnings("unchecked")
+ void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+ context.parent().tell(updateRemoteBucketsMessage((Map<Address, Bucket<?>>) buckets), ActorRef.noSender());
+ }
+
+ void removeRemoteBucket(final Address addr) {
+ context.parent().tell(removeBucketMessage(addr), ActorRef.noSender());
+ }
+
+ @VisibleForTesting
+ public enum Singletons {
+ // Sent from Gossiper to BucketStore, response is an immutable Map<Address, Bucket<?>>
+ GET_ALL_BUCKETS,
+ // Sent from Gossiper to BucketStore, response is an immutable Map<Address, Long>
+ GET_BUCKET_VERSIONS,
+ }
+}
package org.opendaylight.controller.remote.rpc.registry.gossip;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.SetMultimap;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Set;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
/**
* A store that syncs its data across nodes in the cluster.
* <p>
* Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
- *
*/
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
+public abstract class BucketStoreActor<T extends BucketData<T>> extends
+ AbstractUntypedPersistentActorWithMetering {
+ // Internal marker interface for messages which are just bridges to execute a method
+ @FunctionalInterface
+ private interface ExecuteInActor extends Consumer<BucketStoreActor<?>> {
+
+ }
+
/**
* Buckets owned by other known nodes in the cluster.
*/
private Integer incarnation;
private boolean persisting;
- public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+ protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
this.config = Preconditions.checkNotNull(config);
this.initialData = Preconditions.checkNotNull(initialData);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
}
+ static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
+ return actor -> actor.getBucketsByMembers(members);
+ }
+
+ static ExecuteInActor removeBucketMessage(final Address addr) {
+ return actor -> actor.removeBucket(addr);
+ }
+
+ static ExecuteInActor updateRemoteBucketsMessage(final Map<Address, Bucket<?>> buckets) {
+ return actor -> actor.updateRemoteBuckets(buckets);
+ }
+
+ public final T getLocalData() {
+ return getLocalBucket().getData();
+ }
+
+ public final Map<Address, Bucket<T>> getRemoteBuckets() {
+ return remoteBuckets;
+ }
+
+ public final Map<Address, Long> getVersions() {
+ return versions;
+ }
+
@Override
- public String persistenceId() {
+ public final String persistenceId() {
return persistenceId;
}
}
}
- @SuppressWarnings("unchecked")
@Override
protected void handleCommand(final Object message) throws Exception {
- if (message instanceof GetAllBuckets) {
+ if (GET_ALL_BUCKETS == message) {
// GetAllBuckets is used only in testing
- receiveGetAllBuckets();
+ getSender().tell(getAllBuckets(), self());
return;
}
return;
}
- if (message instanceof GetBucketsByMembers) {
- receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
- } else if (message instanceof GetBucketVersions) {
- receiveGetBucketVersions();
- } else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
- } else if (message instanceof RemoveRemoteBucket) {
- removeBucket(((RemoveRemoteBucket) message).getAddress());
+ if (message instanceof ExecuteInActor) {
+ ((ExecuteInActor) message).accept(this);
+ } else if (GET_BUCKET_VERSIONS == message) {
+ // FIXME: do we need to send ourselves?
+ getSender().tell(ImmutableMap.copyOf(versions), getSelf());
} else if (message instanceof Terminated) {
actorTerminated((Terminated) message);
} else if (message instanceof DeleteSnapshotsSuccess) {
}
@Override
- protected void handleRecover(final Object message) throws Exception {
+ protected final void handleRecover(final Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
if (incarnation != null) {
incarnation = incarnation + 1;
}
}
- protected RemoteRpcProviderConfig getConfig() {
+ protected final RemoteRpcProviderConfig getConfig() {
return config;
}
+ protected final void updateLocalBucket(final T data) {
+ final LocalBucket<T> local = getLocalBucket();
+ final boolean bumpIncarnation = local.setData(data);
+ versions.put(selfAddress, local.getVersion());
+
+ if (bumpIncarnation) {
+ LOG.debug("Version wrapped. incrementing incarnation");
+
+ Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+ incarnation = incarnation + 1;
+
+ persisting = true;
+ saveSnapshot(incarnation);
+ }
+ }
+
/**
- * Returns all the buckets the this node knows about, self owned + remote.
+ * Callback to subclasses invoked when a bucket is removed.
+ *
+ * @param address Remote address
+ * @param bucket Bucket removed
*/
- @VisibleForTesting
- protected void receiveGetAllBuckets() {
- final ActorRef sender = getSender();
- sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
- }
+ protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+
+ /**
+ * Callback to subclasses invoked when the set of remote buckets is updated.
+ *
+ * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+ */
+ protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
/**
* Helper to collect all known buckets.
*
* @return self owned + remote buckets
*/
- Map<Address, Bucket<T>> getAllBuckets() {
+ private Map<Address, Bucket<T>> getAllBuckets() {
Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
return all;
}
- /**
- * Returns buckets for requested members that this node knows about.
- *
- * @param members requested members
- */
- void receiveGetBucketsByMembers(final Set<Address> members) {
- final ActorRef sender = getSender();
- Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
- sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
- }
-
/**
* Helper to collect buckets for requested members.
*
* @param members requested members
- * @return buckets for requested members
*/
- Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
+ private void getBucketsByMembers(final Collection<Address> members) {
Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
}
}
- return buckets;
+ getSender().tell(buckets, getSelf());
}
- /**
- * Returns versions for all buckets known.
- */
- void receiveGetBucketVersions() {
- final ActorRef sender = getSender();
- GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
- sender.tell(reply, getSelf());
+ private void removeBucket(final Address addr) {
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+ onBucketRemoved(addr, bucket);
+ }
}
/**
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+ @VisibleForTesting
+ void updateRemoteBuckets(final Map<Address, Bucket<?>> receivedBuckets) {
LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty()) {
//nothing to do
}
final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
- for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+ for (Entry<Address, Bucket<?>> entry : receivedBuckets.entrySet()) {
final Address addr = entry.getKey();
if (selfAddress.equals(addr)) {
continue;
}
- final Bucket<T> receivedBucket = entry.getValue();
+ @SuppressWarnings("unchecked")
+ final Bucket<T> receivedBucket = (Bucket<T>) entry.getValue();
if (receivedBucket == null) {
LOG.debug("Ignoring null bucket from {}", addr);
continue;
}
}
- private void removeBucket(final Address addr) {
- final Bucket<T> bucket = remoteBuckets.remove(addr);
- if (bucket != null) {
- bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
- onBucketRemoved(addr, bucket);
- }
- }
-
private void actorTerminated(final Terminated message) {
LOG.info("Actor termination {} received", message);
}
}
- /**
- * Callback to subclasses invoked when a bucket is removed.
- *
- * @param address Remote address
- * @param bucket Bucket removed
- */
- protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
- // Default noop
- }
-
- /**
- * Callback to subclasses invoked when the set of remote buckets is updated.
- *
- * @param newBuckets Map of address to new bucket. Never null, but can be empty.
- */
- protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
- // Default noop
- }
-
@VisibleForTesting
protected boolean isPersisting() {
return persisting;
}
- public T getLocalData() {
- return getLocalBucket().getData();
- }
-
private LocalBucket<T> getLocalBucket() {
Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
return localBucket;
}
-
- protected void updateLocalBucket(final T data) {
- final LocalBucket<T> local = getLocalBucket();
- final boolean bumpIncarnation = local.setData(data);
- versions.put(selfAddress, local.getVersion());
-
- if (bumpIncarnation) {
- LOG.debug("Version wrapped. incrementing incarnation");
-
- Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
- incarnation = incarnation + 1;
-
- persisting = true;
- saveSnapshot(incarnation);
- }
- }
-
- public Map<Address, Bucket<T>> getRemoteBuckets() {
- return remoteBuckets;
- }
-
- public Map<Address, Long> getVersions() {
- return versions;
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.Address;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.util.Map;
+
+final class GossipEnvelope implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Map<Address, Bucket<?>> buckets;
+ private final Address from;
+ private final Address to;
+
+ GossipEnvelope(final Address from, final Address to, final Map<Address, ? extends Bucket<?>> buckets) {
+ this.to = Preconditions.checkNotNull(to);
+ this.buckets = ImmutableMap.copyOf(buckets);
+ this.from = from;
+ }
+
+ Map<Address, Bucket<?>> buckets() {
+ return buckets;
+ }
+
+ Address from() {
+ return from;
+ }
+
+ Address to() {
+ return to;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.Address;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.util.Map;
+
+final class GossipStatus implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Map<Address, Long> versions;
+ private final Address from;
+
+ GossipStatus(final Address from, final Map<Address, Long> versions) {
+ this.versions = ImmutableMap.copyOf(versions);
+ this.from = from;
+ }
+
+ Address from() {
+ return from;
+ }
+
+ Map<Address, Long> versions() {
+ return versions;
+ }
+}
\ No newline at end of file
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private static final Object GOSSIP_TICK = new Object() {
+ @Override
+ public String toString() {
+ return "gossip tick";
+ }
+ };
+
private final boolean autoStartGossipTicks;
private final RemoteRpcProviderConfig config;
private Cancellable gossipTask;
+ private BucketStoreAccess bucketStore;
+
Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
+ bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+
if (provider instanceof ClusterActorRefProvider ) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
+ GOSSIP_TICK, //message
getContext().dispatcher(), //execution context
getSelf() //sender
);
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick) {
+ if (GOSSIP_TICK.equals(message)) {
receiveGossipTick();
} else if (message instanceof GossipStatus) {
// Message from remote gossiper with its bucket versions
private void removePeer(final Address address) {
clusterMembers.remove(address);
peers.remove(address);
- getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
+ bucketStore.removeRemoteBucket(address);
}
/**
@VisibleForTesting
void receiveGossipStatus(final GossipStatus status) {
// Don't accept messages from non-members
- if (!peers.containsKey(status.from())) {
- return;
+ if (peers.containsKey(status.from())) {
+ // FIXME: sender should be part of GossipStatus
+ final ActorRef sender = getSender();
+ bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
+ }
+ }
+
+ private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+ final Map<Address, Long> localVersions) {
+ final Map<Address, Long> remoteVersions = status.versions();
+
+ //diff between remote list and local
+ final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
+ localIsOlder.removeAll(localVersions.keySet());
+
+ //diff between local list and remote
+ final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
+ localIsNewer.removeAll(remoteVersions.keySet());
+
+
+ for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
+ Address address = entry.getKey();
+ Long remoteVersion = entry.getValue();
+ Long localVersion = localVersions.get(address);
+ if (localVersion == null || remoteVersion == null) {
+ //this condition is taken care of by above diffs
+ continue;
+ }
+
+ if (localVersion < remoteVersion) {
+ localIsOlder.add(address);
+ } else if (localVersion > remoteVersion) {
+ localIsNewer.add(address);
+ }
}
- final ActorRef sender = getSender();
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+ if (!localIsOlder.isEmpty()) {
+ remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
+ }
- futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
+ if (!localIsNewer.isEmpty()) {
+ //send newer buckets to remote
+ bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
+ LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
+ remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
+ });
+ }
}
/**
* @param envelope contains buckets from a remote gossiper
*/
@VisibleForTesting
- <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
+ void receiveGossip(final GossipEnvelope envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
- updateRemoteBuckets(envelope.getBuckets());
+ updateRemoteBuckets(envelope.buckets());
}
/**
* @param buckets map of Buckets to update
*/
@VisibleForTesting
- <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
- getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
- }
-
- /**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
- *
- * @param remote remote node to send Buckets to
- * @param addresses node addresses whose buckets needs to be sent
- */
- void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
-
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
- futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
+ void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+ bucketStore.updateRemoteBuckets(buckets);
}
/**
*/
@VisibleForTesting
void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
-
- //Get local status from bucket store and send to remote
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
-
- LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
-
- futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
- }
-
- ///
- /// Private factories to create mappers
- ///
-
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Process bucket versions received from
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
- * Then this method compares remote bucket versions with local bucket versions.
- * <ul>
- * <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- * to remote
- * <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * to remote so that remote sends GossipEnvelop.
- * </ul>
- *
- * @param sender the remote member
- * @param status bucket versions from a remote member
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
-
- final Map<Address, Long> remoteVersions = status.getVersions();
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- //diff between remote list and local
- Set<Address> localIsOlder = new HashSet<>();
- localIsOlder.addAll(remoteVersions.keySet());
- localIsOlder.removeAll(localVersions.keySet());
-
- //diff between local list and remote
- Set<Address> localIsNewer = new HashSet<>();
- localIsNewer.addAll(localVersions.keySet());
- localIsNewer.removeAll(remoteVersions.keySet());
-
-
- for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
- Address address = entry.getKey();
- Long remoteVersion = entry.getValue();
- Long localVersion = localVersions.get(address);
- if (localVersion == null || remoteVersion == null) {
- //this condition is taken care of by above diffs
- continue;
- }
-
- if (localVersion < remoteVersion) {
- localIsOlder.add(address);
- } else if (localVersion > remoteVersion) {
- localIsNewer.add(address);
- }
- }
-
- if (!localIsOlder.isEmpty()) {
- sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
- }
-
- if (!localIsNewer.isEmpty()) {
- //send newer buckets to remote
- sendGossipTo(sender, localIsNewer);
- }
- }
- return null;
- }
- };
- }
-
- /**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
- * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
- * These buckets are sent to a remote member encapsulated in
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- *
- * @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
- return new Mapper<Object, Void>() {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Void apply(final Object msg) {
- if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
- GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
- sender.tell(envelope, getSelf());
- }
- return null;
- }
- };
+ bucketStore.getBucketVersions(versions -> {
+ LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
+ /*
+ * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
+ * but can we identify which bucket is the local one?
+ */
+ remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
+ });
}
///
*
* We are keeping a boxed version here, as we stick it into a map anyway.
*/
- private Long version;
+ private long version;
private T data;
// We bump versions only if we took a snapshot since last data update
return data;
}
- Long getVersion() {
+ long getVersion() {
return version;
}
boolean setData(final T data) {
this.data = Preconditions.checkNotNull(data);
- if (bumpVersion) {
- final long next = version.longValue() + 1;
- if ((next & 0xffff_ffffL) == 0) {
- return true;
- }
-
- version = next;
- bumpVersion = false;
+ if (!bumpVersion) {
+ return false;
}
- return false;
+
+ bumpVersion = false;
+ return (++version & 0xffff_ffffL) == 0;
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.registry.gossip;
-
-import akka.actor.Address;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
-
-/**
- * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
- */
-public class Messages {
-
- public static class BucketStoreMessages {
-
- public static final class GetAllBuckets implements Serializable {
- private static final long serialVersionUID = 1L;
- }
-
- public static final class GetBucketsByMembers implements Serializable {
- private static final long serialVersionUID = 1L;
- private final Set<Address> members;
-
- public GetBucketsByMembers(final Set<Address> members) {
- Preconditions.checkArgument(members != null, "members can not be null");
- this.members = ImmutableSet.copyOf(members);
- }
-
- public Set<Address> getMembers() {
- return members;
- }
- }
-
- public static class ContainsBuckets<T extends BucketData<T>> implements Serializable {
- private static final long serialVersionUID = -4940160367495308286L;
-
- private final Map<Address, Bucket<T>> buckets;
-
- protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
- Preconditions.checkArgument(buckets != null, "buckets can not be null");
- this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets));
- }
-
- public final Map<Address, Bucket<T>> getBuckets() {
- return buckets;
- }
- }
-
- public static final class GetAllBucketsReply<T extends BucketData<T>> extends ContainsBuckets<T> {
- private static final long serialVersionUID = 1L;
-
- public GetAllBucketsReply(final Map<Address, Bucket<T>> buckets) {
- super(buckets);
- }
- }
-
- public static final class GetBucketsByMembersReply<T extends BucketData<T>> extends ContainsBuckets<T> {
- private static final long serialVersionUID = 1L;
-
- public GetBucketsByMembersReply(final Map<Address, Bucket<T>> buckets) {
- super(buckets);
- }
- }
-
- public static final class GetBucketVersions implements Serializable {
- private static final long serialVersionUID = 1L;
- }
-
- public static class ContainsBucketVersions implements Serializable {
- private static final long serialVersionUID = -8172148925383801613L;
-
- Map<Address, Long> versions;
-
- public ContainsBucketVersions(final Map<Address, Long> versions) {
- Preconditions.checkArgument(versions != null, "versions can not be null or empty");
-
- this.versions = ImmutableMap.copyOf(versions);
- }
-
- public Map<Address, Long> getVersions() {
- return versions;
- }
- }
-
- public static final class GetBucketVersionsReply extends ContainsBucketVersions {
- private static final long serialVersionUID = 1L;
-
- public GetBucketVersionsReply(final Map<Address, Long> versions) {
- super(versions);
- }
- }
-
- public static final class UpdateRemoteBuckets<T extends BucketData<T>> extends ContainsBuckets<T> {
- private static final long serialVersionUID = 1L;
-
- public UpdateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
- super(buckets);
- }
- }
-
- /**
- * Message sent from the gossiper to its parent, therefore not Serializable, requesting removal
- * of a bucket corresponding to an address.
- */
- public static final class RemoveRemoteBucket {
- private final Address address;
-
- public RemoveRemoteBucket(final Address address) {
- this.address = Preconditions.checkNotNull(address);
- }
-
- public Address getAddress() {
- return address;
- }
- }
- }
-
- public static class GossiperMessages {
- public static class Tick implements Serializable {
- private static final long serialVersionUID = -4770935099506366773L;
- }
-
- public static final class GossipTick extends Tick {
- private static final long serialVersionUID = 5803354404380026143L;
- }
-
- public static final class GossipStatus extends ContainsBucketVersions {
- private static final long serialVersionUID = -593037395143883265L;
-
- private final Address from;
-
- public GossipStatus(final Address from, final Map<Address, Long> versions) {
- super(versions);
- this.from = from;
- }
-
- public Address from() {
- return from;
- }
- }
-
- public static final class GossipEnvelope<T extends BucketData<T>> extends ContainsBuckets<T> {
- private static final long serialVersionUID = 8346634072582438818L;
-
- private final Address from;
- private final Address to;
-
- public GossipEnvelope(final Address from, final Address to, final Map<Address, Bucket<T>> buckets) {
- super(buckets);
- Preconditions.checkArgument(to != null, "Recipient of message must not be null");
- this.to = to;
- this.from = from;
- }
-
- public Address from() {
- return from;
- }
-
- public Address to() {
- return to;
- }
- }
- }
-}
import java.util.Map.Entry;
import java.util.Set;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
protected final Logger log = LoggerFactory.getLogger(getClass());
- private static final String NULL_CONSTANT = "null";
-
private static final String LOCAL_CONSTANT = "local";
private static final String ROUTE_CONSTANT = "route:";
public Set<String> getGlobalRpc() {
RoutingTable table = rpcRegistry.getLocalData();
Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
- for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
- if (route.getRoute() == null) {
- globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT);
+ for (DOMRpcIdentifier route : table.getRoutes()) {
+ if (route.getContextReference().isEmpty()) {
+ globalRpc.add(route.getType().toString());
}
}
public Set<String> getLocalRegisteredRoutedRpc() {
RoutingTable table = rpcRegistry.getLocalData();
Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
- for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
- if (route.getRoute() != null) {
+ for (DOMRpcIdentifier route : table.getRoutes()) {
+ if (!route.getContextReference().isEmpty()) {
StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
- builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null
- ? route.getType().toString() : NULL_CONSTANT);
+ builder.append(route.getContextReference().toString()).append(NAME_CONSTANT).append(route.getType());
routedRpc.add(builder.toString());
}
}
*/
private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
final String address) {
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+ Set<DOMRpcIdentifier> routes = table.getRoutes();
Map<String, String> rpcMap = new HashMap<>(routes.size());
- for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
- if (route.getRoute() != null) {
- String routeString = route.getRoute().toString();
+ for (DOMRpcIdentifier route : routes) {
+ if (!route.getContextReference().isEmpty()) {
+ String routeString = route.getContextReference().toString();
if (routeString.contains(routeName)) {
StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
- builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null
- ? route.getType().toString() : NULL_CONSTANT);
+ builder.append(routeString).append(NAME_CONSTANT).append(route.getType());
rpcMap.put(builder.toString(), address);
}
}
*/
private static Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
final String address) {
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+ Set<DOMRpcIdentifier> routes = table.getRoutes();
Map<String, String> rpcMap = new HashMap<>(routes.size());
- for (RpcRouter.RouteIdentifier<?, ?, ?> route : routes) {
- if (route.getType() != null) {
+ for (DOMRpcIdentifier route : routes) {
+ if (!route.getContextReference().isEmpty()) {
String type = route.getType().toString();
if (type.contains(name)) {
StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
- builder.append(route.getRoute() != null ? route.getRoute().toString() : NULL_CONSTANT)
- .append(NAME_CONSTANT).append(type);
+ builder.append(route.getContextReference()).append(NAME_CONSTANT).append(type);
rpcMap.put(builder.toString(), address);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.utils;
-
-import akka.actor.ActorRef;
-import com.google.common.base.Predicate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConditionalProbe {
- private final ActorRef actorRef;
- private final Predicate<Object> predicate;
- Logger log = LoggerFactory.getLogger(ConditionalProbe.class);
-
- public ConditionalProbe(ActorRef actorRef, Predicate<Object> predicate) {
- this.actorRef = actorRef;
- this.predicate = predicate;
- }
-
- public void tell(Object message, ActorRef sender) {
- if (predicate.apply(message)) {
- log.info("sending message to probe {}", message);
- actorRef.tell(message, sender);
- }
- }
-}
package org.opendaylight.controller.remote.rpc.registry;
import static org.junit.Assert.fail;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
// Add rpc on node 1
- List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
+ List<DOMRpcIdentifier> addedRouteIds = createRouteIds();
registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
LOG.info("testRpcAddRemoveInCluster starting");
- List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
+ List<DOMRpcIdentifier> addedRouteIds = createRouteIds();
Address node1Address = node1.provider().getDefaultAddress();
buckets = retrieveBuckets(registry1, testKit, address);
try {
- verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
+ verifyBucket(buckets.get(address), Collections.emptyList());
break;
} catch (AssertionError e) {
if (++numTries >= 50) {
final JavaTestKit testKit = new JavaTestKit(node3);
// Add rpc on node 1
- List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
+ List<DOMRpcIdentifier> addedRouteIds1 = createRouteIds();
registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());
final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
UpdateRemoteEndpoints.class);
// Add rpc on node 2
- List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
+ List<DOMRpcIdentifier> addedRouteIds2 = createRouteIds();
registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());
final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
}
private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) {
- bucketStore.tell(new GetBucketVersions(), testKit.getRef());
- GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
- GetBucketVersionsReply.class);
- return reply.getVersions();
+ bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ final Map<Address, Long> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class);
+ return reply;
}
- private static void verifyBucket(final Bucket<RoutingTable> bucket,
- final List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+ private static void verifyBucket(final Bucket<RoutingTable> bucket, final List<DOMRpcIdentifier> expRouteIds) {
RoutingTable table = bucket.getData();
Assert.assertNotNull("Bucket RoutingTable is null", table);
- for (RouteIdentifier<?, ?, ?> r : expRouteIds) {
+ for (DOMRpcIdentifier r : expRouteIds) {
if (!table.contains(r)) {
Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
}
final JavaTestKit testKit, final Address... addresses) {
int numTries = 0;
while (true) {
- bucketStore.tell(new GetAllBuckets(), testKit.getRef());
+ bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
@SuppressWarnings("unchecked")
- GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
- GetAllBucketsReply.class);
+ Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ Map.class);
- Map<Address, Bucket<RoutingTable>> buckets = reply.getBuckets();
boolean foundAll = true;
for (Address addr : addresses) {
Bucket<RoutingTable> bucket = buckets.get(addr);
final JavaTestKit testKit = new JavaTestKit(node1);
final int nRoutes = 500;
- final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>(nRoutes);
for (int i = 0; i < nRoutes; i++) {
- final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
- new QName(new URI("/mockrpc"), "type" + i), null);
- added[i] = routeId;
+ final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true,
+ new QName(new URI("/mockrpc"), "type" + i)));
+ added.add(routeId);
//Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+ registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)),
ActorRef.noSender());
}
- GetAllBuckets getAllBuckets = new GetAllBuckets();
FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
int numTries = 0;
while (true) {
- registry1.tell(getAllBuckets, testKit.getRef());
+ registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
@SuppressWarnings("unchecked")
- GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+ Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(duration, Map.class);
- Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+ Bucket<RoutingTable> localBucket = buckets.values().iterator().next();
RoutingTable table = localBucket.getData();
if (table != null && table.size() == nRoutes) {
- for (RouteIdentifier<?, ?, ?> r : added) {
- Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+ for (DOMRpcIdentifier r : added) {
+ Assert.assertTrue("RoutingTable contains " + r, table.contains(r));
}
break;
}
}
- private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
+ private List<DOMRpcIdentifier> createRouteIds() throws URISyntaxException {
QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
- List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>(1);
- routeIds.add(new RouteIdentifierImpl(null, type, null));
+ List<DOMRpcIdentifier> routeIds = new ArrayList<>(1);
+ routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type)));
return routeIds;
}
}
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
-import static akka.actor.ActorRef.noSender;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.Status.Success;
-import akka.pattern.Patterns;
-import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
public class BucketStoreTest {
private static ActorSystem system;
- private JavaTestKit kit;
-
@BeforeClass
public static void setup() {
system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
JavaTestKit.shutdownActorSystem(system);
}
- @Before
- public void before() {
- kit = new JavaTestKit(system);
- }
-
- @After
- public void after() {
- kit.shutdown(system);
- }
-
/**
* Given remote buckets, should merge with local copy of remote buckets.
*/
@Test
- public void testReceiveUpdateRemoteBuckets() throws Exception {
+ public void testReceiveUpdateRemoteBuckets() {
+
+ final BucketStoreActor<T> store = createStore();
- final ActorRef store = createStore();
Address localAddress = system.provider().getDefaultAddress();
Bucket<T> localBucket = new BucketImpl<>(0L, new T());
- Address a1 = new Address("tcp", "system1");
- Address a2 = new Address("tcp", "system2");
- Address a3 = new Address("tcp", "system3");
+ final Address a1 = new Address("tcp", "system1");
+ final Address a2 = new Address("tcp", "system2");
+ final Address a3 = new Address("tcp", "system3");
- Bucket<T> b1 = new BucketImpl<>(0L, new T());
- Bucket<T> b2 = new BucketImpl<>(0L, new T());
- Bucket<T> b3 = new BucketImpl<>(0L, new T());
-
- Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
- remoteBuckets.put(a1, b1);
- remoteBuckets.put(a2, b2);
- remoteBuckets.put(a3, b3);
- remoteBuckets.put(localAddress, localBucket);
-
- Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
- Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
+ final Bucket<T> b1 = new BucketImpl<>(0L, new T());
+ final Bucket<T> b2 = new BucketImpl<>(0L, new T());
+ final Bucket<T> b3 = new BucketImpl<>(0L, new T());
//Given remote buckets
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(ImmutableMap.of(a1, b1, a2, b2, localAddress, localBucket));
- //Should contain local bucket
- //Should contain 4 entries i.e a1, a2, a3, local
- Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
- Assert.assertTrue(remoteBucketsInStore.size() == 4);
+ //Should NOT contain local bucket
+ //Should contain ONLY 3 entries i.e a1, a2
+ Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
+ Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
+ Assert.assertTrue(remoteBucketsInStore.size() == 2);
//Add a new remote bucket
Address a4 = new Address("tcp", "system4");
Bucket<T> b4 = new BucketImpl<>(0L, new T());
- remoteBuckets.clear();
- remoteBuckets.put(a4, b4);
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(ImmutableMap.of(a4, b4));
//Should contain a4
- //Should contain 5 entries now i.e a1, a2, a3, a4, local
- remoteBucketsInStore = getBuckets(store);
+ //Should contain 4 entries now i.e a1, a2, a4
+ remoteBucketsInStore = store.getRemoteBuckets();
Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
- Assert.assertTrue(remoteBucketsInStore.size() == 5);
+ Assert.assertTrue(remoteBucketsInStore.size() == 3);
//Update a bucket
Bucket<T> b3New = new BucketImpl<>(0L, new T());
- remoteBuckets.clear();
+ Map<Address, Bucket<?>> remoteBuckets = new HashMap<>(3);
remoteBuckets.put(a3, b3New);
remoteBuckets.put(a1, null);
remoteBuckets.put(a2, null);
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(remoteBuckets);
//Should only update a3
- remoteBucketsInStore = getBuckets(store);
+ remoteBucketsInStore = store.getRemoteBuckets();
Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
- Assert.assertTrue(remoteBucketsInStore.size() == 5);
+ Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Should update versions map
//versions map contains versions for all remote buckets (4).
- Map<Address, Long> versionsInStore = getVersions(store);
+ Map<Address, Long> versionsInStore = store.getVersions();
Assert.assertEquals(4, versionsInStore.size());
Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
//Send older version of bucket
remoteBuckets.clear();
remoteBuckets.put(a3, b3);
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(remoteBuckets);
//Should NOT update a3
- remoteBucketsInStore = getBuckets(store);
+ remoteBucketsInStore = store.getRemoteBuckets();
b3InStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
+
}
/**
*
* @return instance of BucketStore class
*/
- private ActorRef createStore() {
- return kit.childActorOf(Props.create(TestingBucketStore.class,
- new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
- }
-
- @SuppressWarnings("unchecked")
- private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
- final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
- Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
- return result.getBuckets();
+ private static BucketStoreActor<T> createStore() {
+ final Props props = Props.create(TestingBucketStoreActor.class,
+ new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T());
+ return TestActorRef.<BucketStoreActor<T>>create(system, props, "testStore").underlyingActor();
}
- @SuppressWarnings("unchecked")
- private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
- return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
- Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
- }
+ private static final class TestingBucketStoreActor extends BucketStoreActor<T> {
- private static final class TestingBucketStore extends BucketStore<T> {
-
- private final List<ActorRef> toNotify = new ArrayList<>();
-
- TestingBucketStore(final RemoteRpcProviderConfig config,
- final String persistenceId,
- final T initialData) {
+ protected TestingBucketStoreActor(final RemoteRpcProviderConfig config,
+ final String persistenceId,
+ final T initialData) {
super(config, persistenceId, initialData);
}
@Override
- protected void handleCommand(Object message) throws Exception {
- if (message instanceof WaitUntilDonePersisting) {
- handlePersistAsk();
- } else if (message instanceof SaveSnapshotSuccess) {
- super.handleCommand(message);
- handleSnapshotSuccess();
- } else {
- super.handleCommand(message);
- }
- }
+ protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
- private void handlePersistAsk() {
- if (isPersisting()) {
- toNotify.add(getSender());
- } else {
- getSender().tell(new Success(null), noSender());
- }
}
- private void handleSnapshotSuccess() {
- toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
- }
- }
-
- /**
- * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
- */
- private static final class WaitUntilDonePersisting {
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+ }
}
}
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
public class GossiperTest {
@After
public void resetMocks() {
reset(mockGossiper);
-
}
@Test
@SuppressWarnings("unchecked")
@Test
public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore() {
-
Address nonMember = new Address("tcp", "non-member");
GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
verify(mockGossiper, times(0)).getSender();
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings("unchecked")
@Test
public void testReceiveGossipWhenNotAddressedToSelfShouldIgnore() {
- Address notSelf = new Address("tcp", "not-self");
-
- GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class));
doNothing().when(mockGossiper).updateRemoteBuckets(anyMap());
- mockGossiper.receiveGossip(envelope);
+ Address notSelf = new Address("tcp", "not-self");
+ mockGossiper.receiveGossip(new GossipEnvelope(notSelf, notSelf, mock(Map.class)));
verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap());
}
* @return instance of Gossiper class
*/
private static Gossiper createGossiper() {
- final Props props = Gossiper.testProps(new RemoteRpcProviderConfig(system.settings().config()));
+ final RemoteRpcProviderConfig config =
+ new RemoteRpcProviderConfig.Builder("unit-test")
+ .withConfigReader(ConfigFactory::load).build();
+ final Props props = Gossiper.testProps(config);
final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
return testRef.underlyingActor();