BUG 4151 : Create a shared actor system
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Function;
17 import com.google.common.base.Preconditions;
18 import java.util.ArrayList;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Set;
22 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
23 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
27 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.model.api.Module;
31 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.duration.Duration;
36
37 /**
38  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
39  *
40  * It also starts the rpc listeners
41  */
42
43 public class RpcManager extends AbstractUntypedActor {
44
45     private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
46
47     private SchemaContext schemaContext;
48     private ActorRef rpcBroker;
49     private ActorRef rpcRegistry;
50     private final RemoteRpcProviderConfig config;
51     private RpcListener rpcListener;
52     private RemoteRpcImplementation rpcImplementation;
53     private final DOMRpcProviderService rpcProvisionRegistry;
54     private final DOMRpcService rpcServices;
55
56     private RpcManager(final SchemaContext schemaContext,
57                        final DOMRpcProviderService rpcProvisionRegistry,
58                        final DOMRpcService rpcSevices,
59                        final RemoteRpcProviderConfig config) {
60         this.schemaContext = schemaContext;
61         this.rpcProvisionRegistry = rpcProvisionRegistry;
62         rpcServices = rpcSevices;
63         this.config = config;
64
65         createRpcActors();
66         startListeners();
67     }
68
69
70       public static Props props(final SchemaContext schemaContext,
71               final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
72               final RemoteRpcProviderConfig config) {
73           Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
74           Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
75           Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
76           return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
77       }
78
79     private void createRpcActors() {
80         LOG.debug("Create rpc registry and broker actors");
81
82         rpcRegistry =
83                 getContext().actorOf(RpcRegistry.props(config).
84                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
85
86         rpcBroker =
87                 getContext().actorOf(RpcBroker.props(rpcServices).
88                     withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
89
90         final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
91         rpcRegistry.tell(localRouter, self());
92     }
93
94     private void startListeners() {
95         LOG.debug("Registers rpc listeners");
96
97         rpcListener = new RpcListener(rpcRegistry);
98         rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
99
100         rpcServices.registerRpcListener(rpcListener);
101
102         registerRoutedRpcDelegate();
103         announceSupportedRpcs();
104     }
105
106     private void registerRoutedRpcDelegate() {
107         final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
108         final Set<Module> modules = schemaContext.getModules();
109         for(final Module module : modules){
110             for(final RpcDefinition rpcDefinition : module.getRpcs()){
111                 if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
112                     LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
113                     rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
114                 }
115             }
116         }
117         rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
118     }
119
120     /**
121      * Add all the locally registered RPCs in the clustered routing table
122      */
123     private void announceSupportedRpcs(){
124         LOG.debug("Adding all supported rpcs to routing table");
125         final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
126         final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
127         for (final RpcDefinition rpcDef : currentlySupportedRpc) {
128             rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
129         }
130         if(!rpcs.isEmpty()) {
131             rpcListener.onRpcAvailable(rpcs);
132         }
133     }
134
135
136     @Override
137     protected void handleReceive(final Object message) throws Exception {
138       if(message instanceof UpdateSchemaContext) {
139         updateSchemaContext((UpdateSchemaContext) message);
140       }
141
142     }
143
144     private void updateSchemaContext(final UpdateSchemaContext message) {
145       schemaContext = message.getSchemaContext();
146       registerRoutedRpcDelegate();
147       rpcBroker.tell(message, ActorRef.noSender());
148     }
149
150     @Override
151     public SupervisorStrategy supervisorStrategy() {
152       return new OneForOneStrategy(10, Duration.create("1 minute"),
153           new Function<Throwable, SupervisorStrategy.Directive>() {
154             @Override
155             public SupervisorStrategy.Directive apply(final Throwable t) {
156               LOG.error("An exception happened actor will be resumed", t);
157
158               return SupervisorStrategy.resume();
159             }
160           }
161       );
162     }
163 }