2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Creator;
17 import akka.japi.Function;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
19 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.sal.core.api.Broker;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.duration.Duration;
32 * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
34 * It also starts the rpc listeners
37 public class RpcManager extends AbstractUntypedActor {
39 private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
41 private SchemaContext schemaContext;
42 private ActorRef rpcBroker;
43 private ActorRef rpcRegistry;
44 private final Broker.ProviderSession brokerSession;
45 private final RemoteRpcProviderConfig config;
46 private RpcListener rpcListener;
47 private RoutedRpcListener routeChangeListener;
48 private RemoteRpcImplementation rpcImplementation;
49 private final RpcProvisionRegistry rpcProvisionRegistry;
51 private RpcManager(SchemaContext schemaContext,
52 Broker.ProviderSession brokerSession,
53 RpcProvisionRegistry rpcProvisionRegistry) {
54 this.schemaContext = schemaContext;
55 this.brokerSession = brokerSession;
56 this.rpcProvisionRegistry = rpcProvisionRegistry;
57 this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
64 public static Props props(final SchemaContext schemaContext,
65 final Broker.ProviderSession brokerSession,
66 final RpcProvisionRegistry rpcProvisionRegistry) {
67 return Props.create(new Creator<RpcManager>() {
69 public RpcManager create() throws Exception {
70 return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
75 private void createRpcActors() {
76 LOG.debug("Create rpc registry and broker actors");
79 getContext().actorOf(Props.create(RpcRegistry.class).
80 withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
83 getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
84 withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
86 RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
87 rpcRegistry.tell(localRouter, self());
90 private void startListeners() {
91 LOG.debug("Registers rpc listeners");
93 rpcListener = new RpcListener(rpcRegistry);
94 routeChangeListener = new RoutedRpcListener(rpcRegistry);
95 rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
97 brokerSession.addRpcRegistrationListener(rpcListener);
98 rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
99 rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
100 announceSupportedRpcs();
104 * Add all the locally registered RPCs in the clustered routing table
106 private void announceSupportedRpcs(){
107 LOG.debug("Adding all supported rpcs to routing table");
108 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
109 for (QName rpc : currentlySupported) {
110 rpcListener.onRpcImplementationAdded(rpc);
116 protected void handleReceive(Object message) throws Exception {
117 if(message instanceof UpdateSchemaContext) {
118 updateSchemaContext((UpdateSchemaContext) message);
123 private void updateSchemaContext(UpdateSchemaContext message) {
124 this.schemaContext = message.getSchemaContext();
128 public SupervisorStrategy supervisorStrategy() {
129 return new OneForOneStrategy(10, Duration.create("1 minute"),
130 new Function<Throwable, SupervisorStrategy.Directive>() {
132 public SupervisorStrategy.Directive apply(Throwable t) {
133 return SupervisorStrategy.resume();