Merge "BUG 1595 - Clustering : NPE on startup"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Creator;
17 import akka.japi.Function;
18 import com.typesafe.config.Config;
19 import com.typesafe.config.ConfigFactory;
20 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
22 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
23 import org.opendaylight.controller.sal.core.api.Broker;
24 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.Duration;
30 import java.util.Set;
31
32 /**
33  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
34  *
35  * It also starts the rpc listeners
36  */
37
38 public class RpcManager extends AbstractUntypedActor {
39
40   private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
41
42   private SchemaContext schemaContext;
43   private ActorRef rpcBroker;
44   private ActorRef rpcRegistry;
45   private final Broker.ProviderSession brokerSession;
46   private RpcListener rpcListener;
47   private RoutedRpcListener routeChangeListener;
48   private RemoteRpcImplementation rpcImplementation;
49   private final RpcProvisionRegistry rpcProvisionRegistry;
50
51   private RpcManager(SchemaContext schemaContext,
52                      Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
53     this.schemaContext = schemaContext;
54     this.brokerSession = brokerSession;
55     this.rpcProvisionRegistry = rpcProvisionRegistry;
56
57     createRpcActors();
58     startListeners();
59   }
60
61
62   public static Props props(final SchemaContext schemaContext,
63                             final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
64     return Props.create(new Creator<RpcManager>() {
65       @Override
66       public RpcManager create() throws Exception {
67         return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
68       }
69     });
70   }
71
72   private void createRpcActors() {
73     LOG.debug("Create rpc registry and broker actors");
74
75       Config conf = ConfigFactory.load();
76
77     rpcRegistry =
78             getContext().actorOf(Props.create(RpcRegistry.class).
79                 withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
80
81     rpcBroker =
82             getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
83                 withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
84
85     RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
86     rpcRegistry.tell(localRouter, self());
87   }
88
89   private void startListeners() {
90     LOG.debug("Registers rpc listeners");
91
92     rpcListener = new RpcListener(rpcRegistry);
93     routeChangeListener = new RoutedRpcListener(rpcRegistry);
94     rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
95
96     brokerSession.addRpcRegistrationListener(rpcListener);
97     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
98     rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
99     announceSupportedRpcs();
100   }
101
102   /**
103    * Add all the locally registered RPCs in the clustered routing table
104    */
105   private void announceSupportedRpcs(){
106     LOG.debug("Adding all supported rpcs to routing table");
107     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
108     for (QName rpc : currentlySupported) {
109       rpcListener.onRpcImplementationAdded(rpc);
110     }
111   }
112
113
114   @Override
115   protected void handleReceive(Object message) throws Exception {
116     if(message instanceof UpdateSchemaContext) {
117       updateSchemaContext((UpdateSchemaContext) message);
118     }
119
120   }
121
122   private void updateSchemaContext(UpdateSchemaContext message) {
123     this.schemaContext = message.getSchemaContext();
124   }
125
126   @Override
127   public SupervisorStrategy supervisorStrategy() {
128     return new OneForOneStrategy(10, Duration.create("1 minute"),
129         new Function<Throwable, SupervisorStrategy.Directive>() {
130           @Override
131           public SupervisorStrategy.Directive apply(Throwable t) {
132             return SupervisorStrategy.resume();
133           }
134         }
135     );
136   }
137 }