2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.actor.SupervisorStrategy.Directive;
17 import akka.japi.Function;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.HashSet;
21 import java.util.List;
23 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
26 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
27 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
28 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.model.api.Module;
32 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.duration.Duration;
37 * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
41 public class RpcManager extends AbstractUntypedActor {
42 private SchemaContext schemaContext;
43 private ActorRef rpcBroker;
44 private ActorRef rpcRegistry;
45 private final RemoteRpcProviderConfig config;
46 private RpcListener rpcListener;
47 private RemoteRpcImplementation rpcImplementation;
48 private final DOMRpcProviderService rpcProvisionRegistry;
49 private final DOMRpcService rpcServices;
51 private RpcManager(final SchemaContext schemaContext,
52 final DOMRpcProviderService rpcProvisionRegistry,
53 final DOMRpcService rpcSevices,
54 final RemoteRpcProviderConfig config) {
55 this.schemaContext = schemaContext;
56 this.rpcProvisionRegistry = rpcProvisionRegistry;
57 rpcServices = rpcSevices;
65 public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
66 final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
67 Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
68 Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
69 Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
70 return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
73 private void createRpcActors() {
74 LOG.debug("Create rpc registry and broker actors");
76 rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
77 .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
79 rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
80 .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
82 final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
83 rpcRegistry.tell(localRouter, self());
86 private void startListeners() {
87 LOG.debug("Registers rpc listeners");
89 rpcListener = new RpcListener(rpcRegistry);
90 rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
92 rpcServices.registerRpcListener(rpcListener);
94 registerRoutedRpcDelegate();
95 announceSupportedRpcs();
98 private void registerRoutedRpcDelegate() {
99 final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
100 final Set<Module> modules = schemaContext.getModules();
101 for (final Module module : modules) {
102 for (final RpcDefinition rpcDefinition : module.getRpcs()) {
103 if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
104 LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
105 rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
109 rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
113 * Add all the locally registered RPCs in the clustered routing table.
115 private void announceSupportedRpcs() {
116 LOG.debug("Adding all supported rpcs to routing table");
117 final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
118 final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
119 for (final RpcDefinition rpcDef : currentlySupportedRpc) {
120 rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
123 if (!rpcs.isEmpty()) {
124 rpcListener.onRpcAvailable(rpcs);
130 protected void handleReceive(final Object message) throws Exception {
131 if (message instanceof UpdateSchemaContext) {
132 updateSchemaContext((UpdateSchemaContext) message);
136 private void updateSchemaContext(final UpdateSchemaContext message) {
137 schemaContext = message.getSchemaContext();
138 registerRoutedRpcDelegate();
139 rpcBroker.tell(message, ActorRef.noSender());
143 public SupervisorStrategy supervisorStrategy() {
144 return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
145 LOG.error("An exception happened actor will be resumed", t);
147 return SupervisorStrategy.resume();