Teach sal-remoterpc-connector to route actions
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RoutingTable.java
index 5e19653a22d21ed83ae3eec370290dc37f50c1ce..8d67b3bdd311af7747657274d16717695de0eca1 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc.registry;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RoutingTable<I, R> {
-
-  private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class);
-
-  private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
-  private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
-
-  public ConcurrentMap<I, R> getGlobalRpcMap() {
-    return globalRpcMap;
-  }
-
-  public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
-    return routedRpcMap;
-  }
-
-  public R getGlobalRoute(final I routeId) {
-    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
-    return globalRpcMap.get(routeId);
-  }
-
-  public void addGlobalRoute(final I routeId, final R route) {
-    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
-    LOG.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
-    if(globalRpcMap.putIfAbsent(routeId, route) != null) {
-      LOG.debug("A route already exist for route id [{}] ", routeId);
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
+
+public final class RoutingTable extends AbstractRoutingTable<RoutingTable, DOMRpcIdentifier> {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+        private Collection<DOMRpcIdentifier> rpcs;
+        private ActorRef opsInvoker;
+
+        // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+        // be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final RoutingTable table) {
+            rpcs = table.getItems();
+            opsInvoker = table.getInvoker();
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeObject(Serialization.serializedActorPath(opsInvoker));
+
+            final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
+            nnout.writeInt(rpcs.size());
+            for (DOMRpcIdentifier id : rpcs) {
+                nnout.writeSchemaPath(id.getType());
+                nnout.writeYangInstanceIdentifier(id.getContextReference());
+            }
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+
+            final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
+            final int size = nnin.readInt();
+            rpcs = new ArrayList<>(size);
+            for (int i = 0; i < size; ++i) {
+                rpcs.add(DOMRpcIdentifier.create(nnin.readSchemaPath(), nnin.readYangInstanceIdentifier()));
+            }
+        }
+
+        private Object readResolve() {
+            return new RoutingTable(opsInvoker, rpcs);
+        }
     }
-  }
-
-  public void removeGlobalRoute(final I routeId) {
-    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
-    LOG.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
-    globalRpcMap.remove(routeId);
-  }
 
-  public Set<R> getRoutedRpc(final I routeId) {
-    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
-    Set<R> routes = routedRpcMap.get(routeId);
+    private static final long serialVersionUID = 1L;
 
-    if (routes == null) {
-      return Collections.emptySet();
+    RoutingTable(final ActorRef invoker, final Collection<DOMRpcIdentifier> table) {
+        super(invoker, table);
     }
 
-    return ImmutableSet.copyOf(routes);
-  }
-
-  public R getLastAddedRoutedRpc(final I routeId) {
-
-    Set<R> routes = getRoutedRpc(routeId);
-
-    if (routes.isEmpty()) {
-      return null;
+    RoutingTable addRpcs(final Collection<DOMRpcIdentifier> toAdd) {
+        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(getItems());
+        newRpcs.addAll(toAdd);
+        return new RoutingTable(getInvoker(), newRpcs);
     }
 
-    R route = null;
-    Iterator<R> iter = routes.iterator();
-    while (iter.hasNext()) {
-      route = iter.next();
+    RoutingTable removeRpcs(final Collection<DOMRpcIdentifier> toRemove) {
+        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(getItems());
+        newRpcs.removeAll(toRemove);
+        return new RoutingTable(getInvoker(), newRpcs);
     }
 
-    return route;
-  }
-
-  public void addRoutedRpc(final I routeId, final R route)   {
-    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
-    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
-    LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
-    threadSafeAdd(routeId, route);
-  }
-
-  public void addRoutedRpcs(final Set<I> routeIds, final R route) {
-    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
-    for (I routeId : routeIds){
-      addRoutedRpc(routeId, route);
-    }
-  }
-
-  public void removeRoute(final I routeId, final R route) {
-    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
-
-    LinkedHashSet<R> routes = routedRpcMap.get(routeId);
-    if (routes == null) {
-      return;
-    }
-    LOG.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
-    threadSafeRemove(routeId, route);
-  }
-
-  public void removeRoutes(final Set<I> routeIds, final R route) {
-    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
-    for (I routeId : routeIds){
-      removeRoute(routeId, route);
-    }
-  }
-
-  /**
-   * This method guarantees that no 2 thread over write each other's changes.
-   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
-   */
-  private void threadSafeAdd(final I routeId, final R route) {
-
-    for (int i=0;i<100;i++){
-
-      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
-      updatedRoutes.add(route);
-      LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
-      if (oldRoutes == null) {
-        return;
-      }
-
-      updatedRoutes = new LinkedHashSet<>(oldRoutes);
-      updatedRoutes.add(route);
-
-      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
-        return;
-      }
-    }
-    //the method did not already return means it failed to add route in 100 attempts
-    throw new IllegalStateException("Failed to add route [" + routeId + "]");
-  }
-
-  /**
-   * This method guarantees that no 2 thread over write each other's changes.
-   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
-   */
-  private void threadSafeRemove(final I routeId, final R route) {
-    LinkedHashSet<R> updatedRoutes = null;
-    for (int i=0;i<100;i++){
-      LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
-
-      // if route to be deleted is the only entry in the set then remove routeId from the cache
-      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
-        routedRpcMap.remove(routeId);
-        return;
-      }
-
-      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
-      updatedRoutes = new LinkedHashSet<>(oldRoutes);
-      updatedRoutes.remove(route);
-      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
-        return;
-      }
-
+    @Override
+    Object writeReplace() {
+        return new Proxy(this);
     }
-    //the method did not already return means it failed to remove route in 100 attempts
-    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
-  }
 }