Merge "BUG 1082 Migrate sal-rest-connector to Async Data Broker API"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Creator;
17 import akka.japi.Function;
18 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
19 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.sal.core.api.Broker;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.duration.Duration;
28 import java.util.Set;
29
30 /**
31  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
32  *
33  * It also starts the rpc listeners
34  */
35
36 public class RpcManager extends AbstractUntypedActor {
37
38   private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
39
40   private SchemaContext schemaContext;
41   private final ClusterWrapper clusterWrapper;
42   private ActorRef rpcBroker;
43   private ActorRef rpcRegistry;
44   private final Broker.ProviderSession brokerSession;
45   private RpcListener rpcListener;
46   private RoutedRpcListener routeChangeListener;
47   private RemoteRpcImplementation rpcImplementation;
48   private final RpcProvisionRegistry rpcProvisionRegistry;
49
50   private RpcManager(ClusterWrapper clusterWrapper, SchemaContext schemaContext,
51                      Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
52     this.clusterWrapper = clusterWrapper;
53     this.schemaContext = schemaContext;
54     this.brokerSession = brokerSession;
55     this.rpcProvisionRegistry = rpcProvisionRegistry;
56
57     createRpcActors();
58     startListeners();
59   }
60
61
62   public static Props props(final ClusterWrapper clusterWrapper, final SchemaContext schemaContext,
63                             final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
64     return Props.create(new Creator<RpcManager>() {
65       @Override
66       public RpcManager create() throws Exception {
67         return new RpcManager(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry);
68       }
69     });
70   }
71
72   private void createRpcActors() {
73     LOG.debug("Create rpc registry and broker actors");
74
75     rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
76     rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
77   }
78
79   private void startListeners() {
80     LOG.debug("Registers rpc listeners");
81
82     String rpcBrokerPath = clusterWrapper.getAddress().toString() + ActorConstants.RPC_BROKER_PATH;
83     rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
84     routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
85     rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
86
87     brokerSession.addRpcRegistrationListener(rpcListener);
88     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
89     rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
90     announceSupportedRpcs();
91   }
92
93   /**
94    * Add all the locally registered RPCs in the clustered routing table
95    */
96   private void announceSupportedRpcs(){
97     LOG.debug("Adding all supported rpcs to routing table");
98     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
99     for (QName rpc : currentlySupported) {
100       rpcListener.onRpcImplementationAdded(rpc);
101     }
102   }
103
104
105   @Override
106   protected void handleReceive(Object message) throws Exception {
107     if(message instanceof UpdateSchemaContext) {
108       updateSchemaContext((UpdateSchemaContext) message);
109     }
110
111   }
112
113   private void updateSchemaContext(UpdateSchemaContext message) {
114     this.schemaContext = message.getSchemaContext();
115   }
116
117   @Override
118   public SupervisorStrategy supervisorStrategy() {
119     return new OneForOneStrategy(10, Duration.create("1 minute"),
120         new Function<Throwable, SupervisorStrategy.Directive>() {
121           @Override
122           public SupervisorStrategy.Directive apply(Throwable t) {
123             return SupervisorStrategy.resume();
124           }
125         }
126     );
127   }
128 }