4cbce63f9aa2a9b9f44f4c1ba3a27a225d4c2d42
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Function;
17 import java.util.Set;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
19 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.sal.core.api.Broker;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.duration.Duration;
28
29 /**
30  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
31  *
32  * It also starts the rpc listeners
33  */
34
35 public class RpcManager extends AbstractUntypedActor {
36
37   private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
38
39   private SchemaContext schemaContext;
40   private ActorRef rpcBroker;
41   private ActorRef rpcRegistry;
42   private final Broker.ProviderSession brokerSession;
43   private final RemoteRpcProviderConfig config;
44   private RpcListener rpcListener;
45   private RoutedRpcListener routeChangeListener;
46   private RemoteRpcImplementation rpcImplementation;
47   private final RpcProvisionRegistry rpcProvisionRegistry;
48
49   private RpcManager(SchemaContext schemaContext,
50                      Broker.ProviderSession brokerSession,
51                      RpcProvisionRegistry rpcProvisionRegistry) {
52     this.schemaContext = schemaContext;
53     this.brokerSession = brokerSession;
54     this.rpcProvisionRegistry = rpcProvisionRegistry;
55     this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
56
57     createRpcActors();
58     startListeners();
59   }
60
61
62     public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
63             final RpcProvisionRegistry rpcProvisionRegistry) {
64         return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
65     }
66
67   private void createRpcActors() {
68     LOG.debug("Create rpc registry and broker actors");
69
70     rpcRegistry =
71             getContext().actorOf(Props.create(RpcRegistry.class).
72                 withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
73
74     rpcBroker =
75             getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
76                 withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
77
78     RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
79     rpcRegistry.tell(localRouter, self());
80   }
81
82   private void startListeners() {
83     LOG.debug("Registers rpc listeners");
84
85     rpcListener = new RpcListener(rpcRegistry);
86     routeChangeListener = new RoutedRpcListener(rpcRegistry);
87     rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
88
89     brokerSession.addRpcRegistrationListener(rpcListener);
90     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
91     rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
92     announceSupportedRpcs();
93   }
94
95   /**
96    * Add all the locally registered RPCs in the clustered routing table
97    */
98   private void announceSupportedRpcs(){
99     LOG.debug("Adding all supported rpcs to routing table");
100     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
101     for (QName rpc : currentlySupported) {
102       rpcListener.onRpcImplementationAdded(rpc);
103     }
104   }
105
106
107   @Override
108   protected void handleReceive(Object message) throws Exception {
109     if(message instanceof UpdateSchemaContext) {
110       updateSchemaContext((UpdateSchemaContext) message);
111     }
112
113   }
114
115   private void updateSchemaContext(UpdateSchemaContext message) {
116     this.schemaContext = message.getSchemaContext();
117     rpcBroker.tell(message, ActorRef.noSender());
118   }
119
120   @Override
121   public SupervisorStrategy supervisorStrategy() {
122     return new OneForOneStrategy(10, Duration.create("1 minute"),
123         new Function<Throwable, SupervisorStrategy.Directive>() {
124           @Override
125           public SupervisorStrategy.Directive apply(Throwable t) {
126             return SupervisorStrategy.resume();
127           }
128         }
129     );
130   }
131 }