From: Moiz Raja Date: Thu, 4 Jun 2015 17:18:30 +0000 (-0700) Subject: BUG 3566 : Get remote-rpc working again X-Git-Tag: release/beryllium~517 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1b37a887301c6b9159ddb76492c4a3a820c0dfe8 BUG 3566 : Get remote-rpc working again Some changes to moving from CompositeNode to NormalizedNode had broken remote rpc. This patch attempts to get it working again. Verified everything works in a 3 node cluster with the clustering-test-app Change-Id: I2ec714f1d21d95812bd5b486260be3575df252a2 Signed-off-by: Moiz Raja (cherry picked from commit 40217346551e20a809df37cd92955cd63c61944f) --- diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/RoutedDOMRpcRoutingTableEntry.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/RoutedDOMRpcRoutingTableEntry.java index e6df966f36..3d2eb974c0 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/RoutedDOMRpcRoutingTableEntry.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/RoutedDOMRpcRoutingTableEntry.java @@ -52,11 +52,23 @@ final class RoutedDOMRpcRoutingTableEntry extends AbstractDOMRpcRoutingTableEntr final Object value = key.getValue(); if (value instanceof YangInstanceIdentifier) { final YangInstanceIdentifier iid = (YangInstanceIdentifier) value; - final List impls = getImplementations(iid); - if (impls != null) { - return impls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input); + + // Find a DOMRpcImplementation for a specific iid + final List specificImpls = getImplementations(iid); + if (specificImpls != null) { + return specificImpls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input); + } + + LOG.debug("No implementation for context {} found will now look for wildcard id", iid); + + // Find a DOMRpcImplementation for a wild card. Usually remote-rpc-connector would register an + // implementation this way + final List mayBeRemoteImpls = getImplementations(YangInstanceIdentifier.EMPTY); + + if(mayBeRemoteImpls != null){ + return mayBeRemoteImpls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input); } - LOG.debug("No implementation for context {} found", iid); + } else { LOG.warn("Ignoring wrong context value {}", value); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index a6fdfd3ff9..23d1e85e23 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -76,8 +76,16 @@ public class RemoteRpcImplementation implements DOMRpcImplementation { } final RpcResponse rpcReply = (RpcResponse)reply; - final NormalizedNode result = - NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); + final NormalizedNode result; + + if(rpcReply.getResultNormalizedNode() == null){ + result = null; + LOG.debug("Received response for invoke rpc : {} result is null", rpcMsg.getRpc()); + } else { + result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); + LOG.debug("Received response for invoke rpc : {} result : {}", rpcMsg.getRpc(), result); + } + settableFuture.set(new DefaultDOMRpcResult(result)); } }; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java deleted file mode 100644 index c354320b8b..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - - -import akka.actor.ActorRef; -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; -import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.core.api.RpcRoutingContext; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RoutedRpcListener implements RouteChangeListener{ - private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class); - private final ActorRef rpcRegistry; - - public RoutedRpcListener(final ActorRef rpcRegistry) { - Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null"); - this.rpcRegistry = rpcRegistry; - } - - @Override - public void onRouteChange(final RouteChange routeChange) { - final Map> announcements = routeChange.getAnnouncements(); - if(announcements != null && announcements.size() > 0){ - announce(getRouteIdentifiers(announcements)); - } - - final Map> removals = routeChange.getRemovals(); - if(removals != null && removals.size() > 0 ) { - remove(getRouteIdentifiers(removals)); - } - } - - /** - * - * @param announcements - */ - private void announce(final Set> announcements) { - if(LOG.isDebugEnabled()) { - LOG.debug("Announcing [{}]", announcements); - } - final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = - new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements)); - rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); - } - - /** - * - * @param removals - */ - private void remove(final Set> removals){ - if(LOG.isDebugEnabled()) { - LOG.debug("Removing [{}]", removals); - } - final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = - new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals)); - rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); - } - - /** - * - * @param changes - * @return - */ - private Set> getRouteIdentifiers( - final Map> changes) { - - final Set> routeIdSet = new HashSet<>(); - for (final RpcRoutingContext context : changes.keySet()){ - for (final YangInstanceIdentifier instanceId : changes.get(context)){ - final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId); - routeIdSet.add(routeId); - } - } - return routeIdSet; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 7ddac673c2..4dee5dabb7 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -138,6 +138,8 @@ public class RpcBroker extends AbstractUntypedActor { return; } + LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender); + sender.tell(reply, self); } }; @@ -174,7 +176,15 @@ public class RpcBroker extends AbstractUntypedActor { sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( message, errors)), self); } else { - final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult()); + final Node serializedResultNode; + if(result.getResult() == null){ + serializedResultNode = null; + } else { + serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult()); + } + + LOG.debug("Sending response for execute rpc : {}", msg.getRpc()); + sender.tell(new RpcResponse(serializedResultNode), self); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java index 28ff1523cb..0d0335e18d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java @@ -40,7 +40,7 @@ public class RpcListener implements DOMRpcAvailabilityListener{ final List> routeIds = new ArrayList<>(); for (final DOMRpcIdentifier rpc : rpcs) { - final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null); + final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()); routeIds.add(routeId); } final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds); @@ -55,7 +55,7 @@ public class RpcListener implements DOMRpcAvailabilityListener{ } final List> routeIds = new ArrayList<>(); for (final DOMRpcIdentifier rpc : rpcs) { - final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null); + final RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()); routeIds.add(routeId); } final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index f3cb78a301..461bd00f98 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -16,14 +16,18 @@ import akka.actor.SupervisorStrategy; import akka.japi.Function; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -45,7 +49,6 @@ public class RpcManager extends AbstractUntypedActor { private ActorRef rpcRegistry; private final RemoteRpcProviderConfig config; private RpcListener rpcListener; - private RoutedRpcListener routeChangeListener; private RemoteRpcImplementation rpcImplementation; private final DOMRpcProviderService rpcProvisionRegistry; private final DOMRpcService rpcServices; @@ -90,16 +93,28 @@ public class RpcManager extends AbstractUntypedActor { LOG.debug("Registers rpc listeners"); rpcListener = new RpcListener(rpcRegistry); - routeChangeListener = new RoutedRpcListener(rpcRegistry); rpcImplementation = new RemoteRpcImplementation(rpcBroker, config); rpcServices.registerRpcListener(rpcListener); -// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); -// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation); + registerRoutedRpcDelegate(); announceSupportedRpcs(); } + private void registerRoutedRpcDelegate() { + Set rpcIdentifiers = new HashSet<>(); + Set modules = schemaContext.getModules(); + for(Module module : modules){ + for(RpcDefinition rpcDefinition : module.getRpcs()){ + if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) { + LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath()); + rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY)); + } + } + } + rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers); + } + /** * Add all the locally registered RPCs in the clustered routing table */ @@ -124,6 +139,7 @@ public class RpcManager extends AbstractUntypedActor { private void updateSchemaContext(final UpdateSchemaContext message) { schemaContext = message.getSchemaContext(); + registerRoutedRpcDelegate(); rpcBroker.tell(message, ActorRef.noSender()); } @@ -133,6 +149,8 @@ public class RpcManager extends AbstractUntypedActor { new Function() { @Override public SupervisorStrategy.Directive apply(final Throwable t) { + LOG.error("An exception happened actor will be resumed", t); + return SupervisorStrategy.resume(); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java deleted file mode 100644 index 98a33bf4e6..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import com.typesafe.config.ConfigFactory; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.sal.core.api.RpcRoutingContext; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -import java.net.URI; -import java.net.URISyntaxException; - -public class RouteRpcListenerTest { - - static ActorSystem system; - - - @BeforeClass - public static void setup() throws InterruptedException { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - @Test - public void testRouteAdd() throws URISyntaxException, InterruptedException { - new JavaTestKit(system) { - { - // Test announcements - JavaTestKit probeReg = new JavaTestKit(system); - ActorRef rpcRegistry = probeReg.getRef(); - - RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry); - - QName qName = new QName(new URI("actor2"), "actor2"); - RpcRoutingContext context = RpcRoutingContext.create(qName, qName); - YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName)); - rpcListener.onRouteChange(RoutingUtils.announcementChange(context, identifier)); - - probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); - }}; - } - - @Test - public void testRouteRemove() throws URISyntaxException, InterruptedException { - new JavaTestKit(system) { - { - // Test announcements - JavaTestKit probeReg = new JavaTestKit(system); - ActorRef rpcRegistry = probeReg.getRef(); - - RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry); - - QName qName = new QName(new URI("actor2"), "actor2"); - RpcRoutingContext context = RpcRoutingContext.create(qName, qName); - YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName)); - rpcListener.onRouteChange(RoutingUtils.removalChange(context, identifier)); - - probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); - }}; - } -}