2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Function;
17 import com.google.common.base.Preconditions;
18 import java.util.ArrayList;
19 import java.util.HashSet;
20 import java.util.List;
22 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
23 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
27 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.model.api.Module;
31 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.duration.Duration;
38 * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
40 * It also starts the rpc listeners
43 public class RpcManager extends AbstractUntypedActor {
45 private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
47 private SchemaContext schemaContext;
48 private ActorRef rpcBroker;
49 private ActorRef rpcRegistry;
50 private final RemoteRpcProviderConfig config;
51 private RpcListener rpcListener;
52 private RemoteRpcImplementation rpcImplementation;
53 private final DOMRpcProviderService rpcProvisionRegistry;
54 private final DOMRpcService rpcServices;
56 private RpcManager(final SchemaContext schemaContext,
57 final DOMRpcProviderService rpcProvisionRegistry,
58 final DOMRpcService rpcSevices) {
59 this.schemaContext = schemaContext;
60 this.rpcProvisionRegistry = rpcProvisionRegistry;
61 rpcServices = rpcSevices;
62 config = new RemoteRpcProviderConfig(getContext().system().settings().config());
69 public static Props props(final SchemaContext schemaContext,
70 final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
71 Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
72 Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
73 Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
74 return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
77 private void createRpcActors() {
78 LOG.debug("Create rpc registry and broker actors");
81 getContext().actorOf(RpcRegistry.props().
82 withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
85 getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
86 withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
88 final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
89 rpcRegistry.tell(localRouter, self());
92 private void startListeners() {
93 LOG.debug("Registers rpc listeners");
95 rpcListener = new RpcListener(rpcRegistry);
96 rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
98 rpcServices.registerRpcListener(rpcListener);
100 registerRoutedRpcDelegate();
101 announceSupportedRpcs();
104 private void registerRoutedRpcDelegate() {
105 Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
106 Set<Module> modules = schemaContext.getModules();
107 for(Module module : modules){
108 for(RpcDefinition rpcDefinition : module.getRpcs()){
109 if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
110 LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
111 rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
115 rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
119 * Add all the locally registered RPCs in the clustered routing table
121 private void announceSupportedRpcs(){
122 LOG.debug("Adding all supported rpcs to routing table");
123 final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
124 final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
125 for (final RpcDefinition rpcDef : currentlySupportedRpc) {
126 rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
128 rpcListener.onRpcAvailable(rpcs);
133 protected void handleReceive(final Object message) throws Exception {
134 if(message instanceof UpdateSchemaContext) {
135 updateSchemaContext((UpdateSchemaContext) message);
140 private void updateSchemaContext(final UpdateSchemaContext message) {
141 schemaContext = message.getSchemaContext();
142 registerRoutedRpcDelegate();
143 rpcBroker.tell(message, ActorRef.noSender());
147 public SupervisorStrategy supervisorStrategy() {
148 return new OneForOneStrategy(10, Duration.create("1 minute"),
149 new Function<Throwable, SupervisorStrategy.Directive>() {
151 public SupervisorStrategy.Directive apply(final Throwable t) {
152 LOG.error("An exception happened actor will be resumed", t);
154 return SupervisorStrategy.resume();