2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Function;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
19 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.sal.core.api.Broker;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.duration.Duration;
30 * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
32 * It also starts the rpc listeners
35 public class RpcManager extends AbstractUntypedActor {
37 private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
39 private SchemaContext schemaContext;
40 private ActorRef rpcBroker;
41 private ActorRef rpcRegistry;
42 private final Broker.ProviderSession brokerSession;
43 private final RemoteRpcProviderConfig config;
44 private RpcListener rpcListener;
45 private RoutedRpcListener routeChangeListener;
46 private RemoteRpcImplementation rpcImplementation;
47 private final RpcProvisionRegistry rpcProvisionRegistry;
49 private RpcManager(SchemaContext schemaContext,
50 Broker.ProviderSession brokerSession,
51 RpcProvisionRegistry rpcProvisionRegistry) {
52 this.schemaContext = schemaContext;
53 this.brokerSession = brokerSession;
54 this.rpcProvisionRegistry = rpcProvisionRegistry;
55 this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
62 public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
63 final RpcProvisionRegistry rpcProvisionRegistry) {
64 return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
67 private void createRpcActors() {
68 LOG.debug("Create rpc registry and broker actors");
71 getContext().actorOf(Props.create(RpcRegistry.class).
72 withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
75 getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
76 withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
78 RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
79 rpcRegistry.tell(localRouter, self());
82 private void startListeners() {
83 LOG.debug("Registers rpc listeners");
85 rpcListener = new RpcListener(rpcRegistry);
86 routeChangeListener = new RoutedRpcListener(rpcRegistry);
87 rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
89 brokerSession.addRpcRegistrationListener(rpcListener);
90 rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
91 rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
92 announceSupportedRpcs();
96 * Add all the locally registered RPCs in the clustered routing table
98 private void announceSupportedRpcs(){
99 LOG.debug("Adding all supported rpcs to routing table");
100 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
101 for (QName rpc : currentlySupported) {
102 rpcListener.onRpcImplementationAdded(rpc);
108 protected void handleReceive(Object message) throws Exception {
109 if(message instanceof UpdateSchemaContext) {
110 updateSchemaContext((UpdateSchemaContext) message);
115 private void updateSchemaContext(UpdateSchemaContext message) {
116 this.schemaContext = message.getSchemaContext();
117 rpcBroker.tell(message, ActorRef.noSender());
121 public SupervisorStrategy supervisorStrategy() {
122 return new OneForOneStrategy(10, Duration.create("1 minute"),
123 new Function<Throwable, SupervisorStrategy.Directive>() {
125 public SupervisorStrategy.Directive apply(Throwable t) {
126 return SupervisorStrategy.resume();