2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Creator;
17 import akka.japi.Function;
18 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
19 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.sal.core.api.Broker;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.duration.Duration;
31 * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
33 * It also starts the rpc listeners
36 public class RpcManager extends AbstractUntypedActor {
38 private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
40 private SchemaContext schemaContext;
41 private final ClusterWrapper clusterWrapper;
42 private ActorRef rpcBroker;
43 private ActorRef rpcRegistry;
44 private final Broker.ProviderSession brokerSession;
45 private RpcListener rpcListener;
46 private RoutedRpcListener routeChangeListener;
47 private RemoteRpcImplementation rpcImplementation;
48 private final RpcProvisionRegistry rpcProvisionRegistry;
50 private RpcManager(ClusterWrapper clusterWrapper, SchemaContext schemaContext,
51 Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
52 this.clusterWrapper = clusterWrapper;
53 this.schemaContext = schemaContext;
54 this.brokerSession = brokerSession;
55 this.rpcProvisionRegistry = rpcProvisionRegistry;
62 public static Props props(final ClusterWrapper clusterWrapper, final SchemaContext schemaContext,
63 final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
64 return Props.create(new Creator<RpcManager>() {
66 public RpcManager create() throws Exception {
67 return new RpcManager(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry);
72 private void createRpcActors() {
73 LOG.debug("Create rpc registry and broker actors");
75 rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
76 rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
79 private void startListeners() {
80 LOG.debug("Registers rpc listeners");
82 String rpcBrokerPath = clusterWrapper.getAddress().toString() + ActorConstants.RPC_BROKER_PATH;
83 rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
84 routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
85 rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
87 brokerSession.addRpcRegistrationListener(rpcListener);
88 rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
89 rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
90 announceSupportedRpcs();
94 * Add all the locally registered RPCs in the clustered routing table
96 private void announceSupportedRpcs(){
97 LOG.debug("Adding all supported rpcs to routing table");
98 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
99 for (QName rpc : currentlySupported) {
100 rpcListener.onRpcImplementationAdded(rpc);
106 protected void handleReceive(Object message) throws Exception {
107 if(message instanceof UpdateSchemaContext) {
108 updateSchemaContext((UpdateSchemaContext) message);
113 private void updateSchemaContext(UpdateSchemaContext message) {
114 this.schemaContext = message.getSchemaContext();
118 public SupervisorStrategy supervisorStrategy() {
119 return new OneForOneStrategy(10, Duration.create("1 minute"),
120 new Function<Throwable, SupervisorStrategy.Directive>() {
122 public SupervisorStrategy.Directive apply(Throwable t) {
123 return SupervisorStrategy.resume();