BUG 3566 : Get remote-rpc working again 14/22014/1
authorMoiz Raja <moraja@cisco.com>
Thu, 4 Jun 2015 17:18:30 +0000 (10:18 -0700)
committerTom Pantelis <tpanteli@brocade.com>
Sat, 6 Jun 2015 00:34:14 +0000 (00:34 +0000)
Some changes to moving from CompositeNode to NormalizedNode had broken
remote rpc. This patch attempts to get it working again.

Verified everything works in a 3 node cluster with the clustering-test-app

Change-Id: I2ec714f1d21d95812bd5b486260be3575df252a2
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 40217346551e20a809df37cd92955cd63c61944f)

opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/RoutedDOMRpcRoutingTableEntry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java [deleted file]

index e6df966..3d2eb97 100644 (file)
@@ -52,11 +52,23 @@ final class RoutedDOMRpcRoutingTableEntry extends AbstractDOMRpcRoutingTableEntr
             final Object value = key.getValue();
             if (value instanceof YangInstanceIdentifier) {
                 final YangInstanceIdentifier iid = (YangInstanceIdentifier) value;
-                final List<DOMRpcImplementation> impls = getImplementations(iid);
-                if (impls != null) {
-                    return impls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input);
+
+                // Find a DOMRpcImplementation for a specific iid
+                final List<DOMRpcImplementation> specificImpls = getImplementations(iid);
+                if (specificImpls != null) {
+                    return specificImpls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input);
+                }
+
+                LOG.debug("No implementation for context {} found will now look for wildcard id", iid);
+
+                // Find a DOMRpcImplementation for a wild card. Usually remote-rpc-connector would register an
+                // implementation this way
+                final List<DOMRpcImplementation> mayBeRemoteImpls = getImplementations(YangInstanceIdentifier.EMPTY);
+
+                if(mayBeRemoteImpls != null){
+                    return mayBeRemoteImpls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input);
                 }
-                LOG.debug("No implementation for context {} found", iid);
+
             } else {
                 LOG.warn("Ignoring wrong context value {}", value);
             }
index a6fdfd3..23d1e85 100644 (file)
@@ -76,8 +76,16 @@ public class RemoteRpcImplementation implements DOMRpcImplementation {
                 }
 
                 final RpcResponse rpcReply = (RpcResponse)reply;
-                final NormalizedNode<?, ?> result =
-                        NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
+                final NormalizedNode<?, ?> result;
+
+                if(rpcReply.getResultNormalizedNode() == null){
+                    result = null;
+                    LOG.debug("Received response for invoke rpc : {} result is null", rpcMsg.getRpc());
+                } else {
+                    result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
+                    LOG.debug("Received response for invoke rpc : {} result : {}", rpcMsg.getRpc(), result);
+                }
+
                 settableFuture.set(new DefaultDOMRpcResult(result));
             }
         };
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java
deleted file mode 100644 (file)
index c354320..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>{
-    private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
-    private final ActorRef rpcRegistry;
-
-    public RoutedRpcListener(final ActorRef rpcRegistry) {
-        Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
-        this.rpcRegistry = rpcRegistry;
-    }
-
-    @Override
-    public void onRouteChange(final RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
-        final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
-        if(announcements != null && announcements.size() > 0){
-            announce(getRouteIdentifiers(announcements));
-        }
-
-        final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
-        if(removals != null && removals.size() > 0 ) {
-            remove(getRouteIdentifiers(removals));
-        }
-    }
-
-    /**
-     *
-     * @param announcements
-     */
-    private void announce(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Announcing [{}]", announcements);
-        }
-        final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg =
-                new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
-        rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
-    }
-
-    /**
-     *
-     * @param removals
-     */
-    private void remove(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Removing [{}]", removals);
-        }
-        final RpcRegistry.Messages.RemoveRoutes removeRpcMsg =
-                new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
-        rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
-    }
-
-    /**
-     *
-     * @param changes
-     * @return
-     */
-    private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(
-            final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
-
-        final Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
-        for (final RpcRoutingContext context : changes.keySet()){
-            for (final YangInstanceIdentifier instanceId : changes.get(context)){
-                final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
-                routeIdSet.add(routeId);
-            }
-        }
-        return routeIdSet;
-    }
-}
index 7ddac67..4dee5da 100644 (file)
@@ -138,6 +138,8 @@ public class RpcBroker extends AbstractUntypedActor {
                     return;
                 }
 
+                LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender);
+
                 sender.tell(reply, self);
             }
         };
@@ -174,7 +176,15 @@ public class RpcBroker extends AbstractUntypedActor {
                     sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
                             message, errors)), self);
                 } else {
-                    final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+                    final Node serializedResultNode;
+                    if(result.getResult() == null){
+                        serializedResultNode = null;
+                    } else {
+                        serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+                    }
+
+                    LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
+
                     sender.tell(new RpcResponse(serializedResultNode), self);
                 }
             }
index 28ff152..0d0335e 100644 (file)
@@ -40,7 +40,7 @@ public class RpcListener implements DOMRpcAvailabilityListener{
         final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
 
         for (final DOMRpcIdentifier rpc : rpcs) {
-            final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+            final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
             routeIds.add(routeId);
         }
         final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
@@ -55,7 +55,7 @@ public class RpcListener implements DOMRpcAvailabilityListener{
         }
         final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
         for (final DOMRpcIdentifier rpc : rpcs) {
-            final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+            final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
             routeIds.add(routeId);
         }
         final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
index f3cb78a..461bd00 100644 (file)
@@ -16,14 +16,18 @@ import akka.actor.SupervisorStrategy;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -45,7 +49,6 @@ public class RpcManager extends AbstractUntypedActor {
     private ActorRef rpcRegistry;
     private final RemoteRpcProviderConfig config;
     private RpcListener rpcListener;
-    private RoutedRpcListener routeChangeListener;
     private RemoteRpcImplementation rpcImplementation;
     private final DOMRpcProviderService rpcProvisionRegistry;
     private final DOMRpcService rpcServices;
@@ -90,16 +93,28 @@ public class RpcManager extends AbstractUntypedActor {
         LOG.debug("Registers rpc listeners");
 
         rpcListener = new RpcListener(rpcRegistry);
-        routeChangeListener = new RoutedRpcListener(rpcRegistry);
         rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
 
         rpcServices.registerRpcListener(rpcListener);
 
-//        rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-//        rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+        registerRoutedRpcDelegate();
         announceSupportedRpcs();
     }
 
+    private void registerRoutedRpcDelegate() {
+        Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+        Set<Module> modules = schemaContext.getModules();
+        for(Module module : modules){
+            for(RpcDefinition rpcDefinition : module.getRpcs()){
+                if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
+                    LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
+                    rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
+                }
+            }
+        }
+        rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+    }
+
     /**
      * Add all the locally registered RPCs in the clustered routing table
      */
@@ -124,6 +139,7 @@ public class RpcManager extends AbstractUntypedActor {
 
     private void updateSchemaContext(final UpdateSchemaContext message) {
       schemaContext = message.getSchemaContext();
+      registerRoutedRpcDelegate();
       rpcBroker.tell(message, ActorRef.noSender());
     }
 
@@ -133,6 +149,8 @@ public class RpcManager extends AbstractUntypedActor {
           new Function<Throwable, SupervisorStrategy.Directive>() {
             @Override
             public SupervisorStrategy.Directive apply(final Throwable t) {
+              LOG.error("An exception happened actor will be resumed", t);
+
               return SupervisorStrategy.resume();
             }
           }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java
deleted file mode 100644 (file)
index 98a33bf..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import com.typesafe.config.ConfigFactory;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-public class RouteRpcListenerTest {
-
-  static ActorSystem system;
-
-
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(system);
-    system = null;
-  }
-
-  @Test
-  public void testRouteAdd() throws URISyntaxException, InterruptedException {
-    new JavaTestKit(system) {
-      {
-        // Test announcements
-        JavaTestKit probeReg = new JavaTestKit(system);
-        ActorRef rpcRegistry = probeReg.getRef();
-
-        RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry);
-
-        QName qName = new QName(new URI("actor2"), "actor2");
-        RpcRoutingContext context = RpcRoutingContext.create(qName, qName);
-        YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName));
-        rpcListener.onRouteChange(RoutingUtils.announcementChange(context, identifier));
-
-        probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
-      }};
-  }
-
-  @Test
-  public void testRouteRemove() throws URISyntaxException, InterruptedException {
-    new JavaTestKit(system) {
-      {
-        // Test announcements
-        JavaTestKit probeReg = new JavaTestKit(system);
-        ActorRef rpcRegistry = probeReg.getRef();
-
-        RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry);
-
-        QName qName = new QName(new URI("actor2"), "actor2");
-        RpcRoutingContext context = RpcRoutingContext.create(qName, qName);
-        YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName));
-        rpcListener.onRouteChange(RoutingUtils.removalChange(context, identifier));
-
-        probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
-      }};
-  }
-}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.