X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRoutingTable.java;h=2f9513175776cd72b879437832ffa6c34d494f06;hp=5e19653a22d21ed83ae3eec370290dc37f50c1ce;hb=5b66dd8f5e3467a07e77b20fe696b29993ce5565;hpb=d255fdd0b14660a22ff63771d954ac3fe5d0cb7e diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index 5e19653a22..2f95131757 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -5,167 +5,132 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc.registry; +import akka.actor.ActorRef; +import akka.serialization.JavaSerializer; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashSet; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class RoutingTable { - - private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class); - - private ConcurrentMap globalRpcMap = new ConcurrentHashMap<>(); - private ConcurrentMap> routedRpcMap = new ConcurrentHashMap<>(); - - public ConcurrentMap getGlobalRpcMap() { - return globalRpcMap; - } - - public ConcurrentMap> getRoutedRpcMap() { - return routedRpcMap; - } - - public R getGlobalRoute(final I routeId) { - Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!"); - return globalRpcMap.get(routeId); - } - - public void addGlobalRoute(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!"); - Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!"); - LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route); - if(globalRpcMap.putIfAbsent(routeId, route) != null) { - LOG.debug("A route already exist for route id [{}] ", routeId); +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData; + +public final class RoutingTable implements BucketData, Serializable { + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") + private Collection rpcs; + private ActorRef rpcInvoker; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final RoutingTable table) { + rpcs = table.getRoutes(); + rpcInvoker = table.getRpcInvoker(); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeObject(Serialization.serializedActorPath(rpcInvoker)); + + final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out); + nnout.writeInt(rpcs.size()); + for (DOMRpcIdentifier id : rpcs) { + nnout.writeSchemaPath(id.getType()); + nnout.writeYangInstanceIdentifier(id.getContextReference()); + } + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject()); + + final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in); + final int size = nnin.readInt(); + rpcs = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + rpcs.add(DOMRpcIdentifier.create(nnin.readSchemaPath(), nnin.readYangInstanceIdentifier())); + } + } + + private Object readResolve() { + return new RoutingTable(rpcInvoker, rpcs); + } } - } - public void removeGlobalRoute(final I routeId) { - Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!"); - LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId); - globalRpcMap.remove(routeId); - } + private static final long serialVersionUID = 1L; - public Set getRoutedRpc(final I routeId) { - Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!"); - Set routes = routedRpcMap.get(routeId); + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") + private final Set rpcs; + private final ActorRef rpcInvoker; - if (routes == null) { - return Collections.emptySet(); + RoutingTable(final ActorRef rpcInvoker, final Collection table) { + this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker); + this.rpcs = ImmutableSet.copyOf(table); } - return ImmutableSet.copyOf(routes); - } - - public R getLastAddedRoutedRpc(final I routeId) { - - Set routes = getRoutedRpc(routeId); - - if (routes.isEmpty()) { - return null; + @Override + public Optional getWatchActor() { + return Optional.of(rpcInvoker); } - R route = null; - Iterator iter = routes.iterator(); - while (iter.hasNext()) { - route = iter.next(); + public Set getRoutes() { + return rpcs; } - return route; - } - - public void addRoutedRpc(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null"); - Preconditions.checkNotNull(route, "addRoute: route cannot be null"); - LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route); - threadSafeAdd(routeId, route); - } - - public void addRoutedRpcs(final Set routeIds, final R route) { - Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null"); - for (I routeId : routeIds){ - addRoutedRpc(routeId, route); + ActorRef getRpcInvoker() { + return rpcInvoker; } - } - - public void removeRoute(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!"); - Preconditions.checkNotNull(route, "removeRoute: route cannot be null!"); - LinkedHashSet routes = routedRpcMap.get(routeId); - if (routes == null) { - return; + RoutingTable addRpcs(final Collection toAdd) { + final Set newRpcs = new HashSet<>(rpcs); + newRpcs.addAll(toAdd); + return new RoutingTable(rpcInvoker, newRpcs); } - LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route); - threadSafeRemove(routeId, route); - } - - public void removeRoutes(final Set routeIds, final R route) { - Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null"); - for (I routeId : routeIds){ - removeRoute(routeId, route); - } - } - - /** - * This method guarantees that no 2 thread over write each other's changes. - * Just so that we dont end up in infinite loop, it tries for 100 times then throw - */ - private void threadSafeAdd(final I routeId, final R route) { - for (int i=0;i<100;i++){ + RoutingTable removeRpcs(final Collection toRemove) { + final Set newRpcs = new HashSet<>(rpcs); + newRpcs.removeAll(toRemove); + return new RoutingTable(rpcInvoker, newRpcs); + } - LinkedHashSet updatedRoutes = new LinkedHashSet<>(); - updatedRoutes.add(route); - LinkedHashSet oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes); - if (oldRoutes == null) { - return; - } + private Object writeReplace() { + return new Proxy(this); + } - updatedRoutes = new LinkedHashSet<>(oldRoutes); - updatedRoutes.add(route); + @VisibleForTesting + boolean contains(final DOMRpcIdentifier routeId) { + return rpcs.contains(routeId); + } - if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { - return; - } + @VisibleForTesting + int size() { + return rpcs.size(); } - //the method did not already return means it failed to add route in 100 attempts - throw new IllegalStateException("Failed to add route [" + routeId + "]"); - } - - /** - * This method guarantees that no 2 thread over write each other's changes. - * Just so that we dont end up in infinite loop, it tries for 100 times then throw - */ - private void threadSafeRemove(final I routeId, final R route) { - LinkedHashSet updatedRoutes = null; - for (int i=0;i<100;i++){ - LinkedHashSet oldRoutes = routedRpcMap.get(routeId); - - // if route to be deleted is the only entry in the set then remove routeId from the cache - if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){ - routedRpcMap.remove(routeId); - return; - } - - // if there are multiple routes for this routeId, remove the route to be deleted only from the set. - updatedRoutes = new LinkedHashSet<>(oldRoutes); - updatedRoutes.remove(route); - if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { - return; - } + @Override + public String toString() { + return "RoutingTable{" + "rpcs=" + rpcs + ", rpcInvoker=" + rpcInvoker + '}'; } - //the method did not already return means it failed to remove route in 100 attempts - throw new IllegalStateException("Failed to remove route [" + routeId + "]"); - } }