Merge "Make idle timeout configurable in ssh proxy server"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Creator;
17 import akka.japi.Function;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
19 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.sal.core.api.Broker;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.duration.Duration;
28
29 import java.util.Set;
30
31 /**
32  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
33  *
34  * It also starts the rpc listeners
35  */
36
37 public class RpcManager extends AbstractUntypedActor {
38
39   private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
40
41   private SchemaContext schemaContext;
42   private ActorRef rpcBroker;
43   private ActorRef rpcRegistry;
44   private final Broker.ProviderSession brokerSession;
45   private final RemoteRpcProviderConfig config;
46   private RpcListener rpcListener;
47   private RoutedRpcListener routeChangeListener;
48   private RemoteRpcImplementation rpcImplementation;
49   private final RpcProvisionRegistry rpcProvisionRegistry;
50
51   private RpcManager(SchemaContext schemaContext,
52                      Broker.ProviderSession brokerSession,
53                      RpcProvisionRegistry rpcProvisionRegistry) {
54     this.schemaContext = schemaContext;
55     this.brokerSession = brokerSession;
56     this.rpcProvisionRegistry = rpcProvisionRegistry;
57     this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
58
59     createRpcActors();
60     startListeners();
61   }
62
63
64   public static Props props(final SchemaContext schemaContext,
65                             final Broker.ProviderSession brokerSession,
66                             final RpcProvisionRegistry rpcProvisionRegistry) {
67     return Props.create(new Creator<RpcManager>() {
68       private static final long serialVersionUID = 1L;
69       @Override
70       public RpcManager create() throws Exception {
71         return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
72       }
73     });
74   }
75
76   private void createRpcActors() {
77     LOG.debug("Create rpc registry and broker actors");
78
79     rpcRegistry =
80             getContext().actorOf(Props.create(RpcRegistry.class).
81                 withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
82
83     rpcBroker =
84             getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
85                 withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
86
87     RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
88     rpcRegistry.tell(localRouter, self());
89   }
90
91   private void startListeners() {
92     LOG.debug("Registers rpc listeners");
93
94     rpcListener = new RpcListener(rpcRegistry);
95     routeChangeListener = new RoutedRpcListener(rpcRegistry);
96     rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
97
98     brokerSession.addRpcRegistrationListener(rpcListener);
99     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
100     rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
101     announceSupportedRpcs();
102   }
103
104   /**
105    * Add all the locally registered RPCs in the clustered routing table
106    */
107   private void announceSupportedRpcs(){
108     LOG.debug("Adding all supported rpcs to routing table");
109     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
110     for (QName rpc : currentlySupported) {
111       rpcListener.onRpcImplementationAdded(rpc);
112     }
113   }
114
115
116   @Override
117   protected void handleReceive(Object message) throws Exception {
118     if(message instanceof UpdateSchemaContext) {
119       updateSchemaContext((UpdateSchemaContext) message);
120     }
121
122   }
123
124   private void updateSchemaContext(UpdateSchemaContext message) {
125     this.schemaContext = message.getSchemaContext();
126   }
127
128   @Override
129   public SupervisorStrategy supervisorStrategy() {
130     return new OneForOneStrategy(10, Duration.create("1 minute"),
131         new Function<Throwable, SupervisorStrategy.Directive>() {
132           @Override
133           public SupervisorStrategy.Directive apply(Throwable t) {
134             return SupervisorStrategy.resume();
135           }
136         }
137     );
138   }
139 }