final Object value = key.getValue();
if (value instanceof YangInstanceIdentifier) {
final YangInstanceIdentifier iid = (YangInstanceIdentifier) value;
- final List<DOMRpcImplementation> impls = getImplementations(iid);
- if (impls != null) {
- return impls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input);
+
+ // Find a DOMRpcImplementation for a specific iid
+ final List<DOMRpcImplementation> specificImpls = getImplementations(iid);
+ if (specificImpls != null) {
+ return specificImpls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input);
+ }
+
+ LOG.debug("No implementation for context {} found will now look for wildcard id", iid);
+
+ // Find a DOMRpcImplementation for a wild card. Usually remote-rpc-connector would register an
+ // implementation this way
+ final List<DOMRpcImplementation> mayBeRemoteImpls = getImplementations(YangInstanceIdentifier.EMPTY);
+
+ if(mayBeRemoteImpls != null){
+ return mayBeRemoteImpls.get(0).invokeRpc(DOMRpcIdentifier.create(getSchemaPath(), iid), input);
}
- LOG.debug("No implementation for context {} found", iid);
+
} else {
LOG.warn("Ignoring wrong context value {}", value);
}
}
final RpcResponse rpcReply = (RpcResponse)reply;
- final NormalizedNode<?, ?> result =
- NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
+ final NormalizedNode<?, ?> result;
+
+ if(rpcReply.getResultNormalizedNode() == null){
+ result = null;
+ LOG.debug("Received response for invoke rpc : {} result is null", rpcMsg.getRpc());
+ } else {
+ result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
+ LOG.debug("Received response for invoke rpc : {} result : {}", rpcMsg.getRpc(), result);
+ }
+
settableFuture.set(new DefaultDOMRpcResult(result));
}
};
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>{
- private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
- private final ActorRef rpcRegistry;
-
- public RoutedRpcListener(final ActorRef rpcRegistry) {
- Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
- this.rpcRegistry = rpcRegistry;
- }
-
- @Override
- public void onRouteChange(final RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
- final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
- if(announcements != null && announcements.size() > 0){
- announce(getRouteIdentifiers(announcements));
- }
-
- final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
- if(removals != null && removals.size() > 0 ) {
- remove(getRouteIdentifiers(removals));
- }
- }
-
- /**
- *
- * @param announcements
- */
- private void announce(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Announcing [{}]", announcements);
- }
- final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg =
- new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
- rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
- }
-
- /**
- *
- * @param removals
- */
- private void remove(final Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
- if(LOG.isDebugEnabled()) {
- LOG.debug("Removing [{}]", removals);
- }
- final RpcRegistry.Messages.RemoveRoutes removeRpcMsg =
- new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
- rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
- }
-
- /**
- *
- * @param changes
- * @return
- */
- private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(
- final Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
-
- final Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
- for (final RpcRoutingContext context : changes.keySet()){
- for (final YangInstanceIdentifier instanceId : changes.get(context)){
- final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
- routeIdSet.add(routeId);
- }
- }
- return routeIdSet;
- }
-}
return;
}
+ LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender);
+
sender.tell(reply, self);
}
};
sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
message, errors)), self);
} else {
- final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+ final Node serializedResultNode;
+ if(result.getResult() == null){
+ serializedResultNode = null;
+ } else {
+ serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+ }
+
+ LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
+
sender.tell(new RpcResponse(serializedResultNode), self);
}
}
final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
for (final DOMRpcIdentifier rpc : rpcs) {
- final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+ final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
routeIds.add(routeId);
}
final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
}
final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
for (final DOMRpcIdentifier rpc : rpcs) {
- final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), null);
+ final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
routeIds.add(routeId);
}
final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
import akka.japi.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private ActorRef rpcRegistry;
private final RemoteRpcProviderConfig config;
private RpcListener rpcListener;
- private RoutedRpcListener routeChangeListener;
private RemoteRpcImplementation rpcImplementation;
private final DOMRpcProviderService rpcProvisionRegistry;
private final DOMRpcService rpcServices;
LOG.debug("Registers rpc listeners");
rpcListener = new RpcListener(rpcRegistry);
- routeChangeListener = new RoutedRpcListener(rpcRegistry);
rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
rpcServices.registerRpcListener(rpcListener);
-// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+ registerRoutedRpcDelegate();
announceSupportedRpcs();
}
+ private void registerRoutedRpcDelegate() {
+ Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+ Set<Module> modules = schemaContext.getModules();
+ for(Module module : modules){
+ for(RpcDefinition rpcDefinition : module.getRpcs()){
+ if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
+ LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
+ rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
+ }
+ }
+ }
+ rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+ }
+
/**
* Add all the locally registered RPCs in the clustered routing table
*/
private void updateSchemaContext(final UpdateSchemaContext message) {
schemaContext = message.getSchemaContext();
+ registerRoutedRpcDelegate();
rpcBroker.tell(message, ActorRef.noSender());
}
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(final Throwable t) {
+ LOG.error("An exception happened actor will be resumed", t);
+
return SupervisorStrategy.resume();
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import com.typesafe.config.ConfigFactory;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-public class RouteRpcListenerTest {
-
- static ActorSystem system;
-
-
- @BeforeClass
- public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
- @Test
- public void testRouteAdd() throws URISyntaxException, InterruptedException {
- new JavaTestKit(system) {
- {
- // Test announcements
- JavaTestKit probeReg = new JavaTestKit(system);
- ActorRef rpcRegistry = probeReg.getRef();
-
- RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry);
-
- QName qName = new QName(new URI("actor2"), "actor2");
- RpcRoutingContext context = RpcRoutingContext.create(qName, qName);
- YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName));
- rpcListener.onRouteChange(RoutingUtils.announcementChange(context, identifier));
-
- probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
- }};
- }
-
- @Test
- public void testRouteRemove() throws URISyntaxException, InterruptedException {
- new JavaTestKit(system) {
- {
- // Test announcements
- JavaTestKit probeReg = new JavaTestKit(system);
- ActorRef rpcRegistry = probeReg.getRef();
-
- RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry);
-
- QName qName = new QName(new URI("actor2"), "actor2");
- RpcRoutingContext context = RpcRoutingContext.create(qName, qName);
- YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName));
- rpcListener.onRouteChange(RoutingUtils.removalChange(context, identifier));
-
- probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
- }};
- }
-}