1ade84bd0fc1bee9fb8ad8db9ac33939cf591422
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Function;
17 import com.google.common.base.Preconditions;
18 import java.util.ArrayList;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Set;
22 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
23 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
27 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.model.api.Module;
31 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.duration.Duration;
36
37 /**
38  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
39  *
40  * It also starts the rpc listeners
41  */
42
43 public class RpcManager extends AbstractUntypedActor {
44
45     private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
46
47     private SchemaContext schemaContext;
48     private ActorRef rpcBroker;
49     private ActorRef rpcRegistry;
50     private final RemoteRpcProviderConfig config;
51     private RpcListener rpcListener;
52     private RemoteRpcImplementation rpcImplementation;
53     private final DOMRpcProviderService rpcProvisionRegistry;
54     private final DOMRpcService rpcServices;
55
56     private RpcManager(final SchemaContext schemaContext,
57                        final DOMRpcProviderService rpcProvisionRegistry,
58                        final DOMRpcService rpcSevices) {
59         this.schemaContext = schemaContext;
60         this.rpcProvisionRegistry = rpcProvisionRegistry;
61         rpcServices = rpcSevices;
62         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
63
64         createRpcActors();
65         startListeners();
66     }
67
68
69       public static Props props(final SchemaContext schemaContext,
70               final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
71           Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
72           Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
73           Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
74           return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
75       }
76
77     private void createRpcActors() {
78         LOG.debug("Create rpc registry and broker actors");
79
80         rpcRegistry =
81                 getContext().actorOf(RpcRegistry.props().
82                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
83
84         rpcBroker =
85                 getContext().actorOf(RpcBroker.props(rpcServices).
86                     withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
87
88         final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
89         rpcRegistry.tell(localRouter, self());
90     }
91
92     private void startListeners() {
93         LOG.debug("Registers rpc listeners");
94
95         rpcListener = new RpcListener(rpcRegistry);
96         rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
97
98         rpcServices.registerRpcListener(rpcListener);
99
100         registerRoutedRpcDelegate();
101         announceSupportedRpcs();
102     }
103
104     private void registerRoutedRpcDelegate() {
105         final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
106         final Set<Module> modules = schemaContext.getModules();
107         for(final Module module : modules){
108             for(final RpcDefinition rpcDefinition : module.getRpcs()){
109                 if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
110                     LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
111                     rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
112                 }
113             }
114         }
115         rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
116     }
117
118     /**
119      * Add all the locally registered RPCs in the clustered routing table
120      */
121     private void announceSupportedRpcs(){
122         LOG.debug("Adding all supported rpcs to routing table");
123         final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
124         final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
125         for (final RpcDefinition rpcDef : currentlySupportedRpc) {
126             rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
127         }
128         if(!rpcs.isEmpty()) {
129             rpcListener.onRpcAvailable(rpcs);
130         }
131     }
132
133
134     @Override
135     protected void handleReceive(final Object message) throws Exception {
136       if(message instanceof UpdateSchemaContext) {
137         updateSchemaContext((UpdateSchemaContext) message);
138       }
139
140     }
141
142     private void updateSchemaContext(final UpdateSchemaContext message) {
143       schemaContext = message.getSchemaContext();
144       registerRoutedRpcDelegate();
145       rpcBroker.tell(message, ActorRef.noSender());
146     }
147
148     @Override
149     public SupervisorStrategy supervisorStrategy() {
150       return new OneForOneStrategy(10, Duration.create("1 minute"),
151           new Function<Throwable, SupervisorStrategy.Directive>() {
152             @Override
153             public SupervisorStrategy.Directive apply(final Throwable t) {
154               LOG.error("An exception happened actor will be resumed", t);
155
156               return SupervisorStrategy.resume();
157             }
158           }
159       );
160     }
161 }