Fix CS warnings in sal-remoterpc-connector and enable enforcement
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.actor.SupervisorStrategy.Directive;
17 import akka.japi.Function;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
26 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
27 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
28 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.model.api.Module;
32 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.duration.Duration;
35
36 /**
37  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
38  * the rpc listeners
39  */
40
41 public class RpcManager extends AbstractUntypedActor {
42     private SchemaContext schemaContext;
43     private ActorRef rpcBroker;
44     private ActorRef rpcRegistry;
45     private final RemoteRpcProviderConfig config;
46     private RpcListener rpcListener;
47     private RemoteRpcImplementation rpcImplementation;
48     private final DOMRpcProviderService rpcProvisionRegistry;
49     private final DOMRpcService rpcServices;
50
51     private RpcManager(final SchemaContext schemaContext,
52                        final DOMRpcProviderService rpcProvisionRegistry,
53                        final DOMRpcService rpcSevices,
54                        final RemoteRpcProviderConfig config) {
55         this.schemaContext = schemaContext;
56         this.rpcProvisionRegistry = rpcProvisionRegistry;
57         rpcServices = rpcSevices;
58         this.config = config;
59
60         createRpcActors();
61         startListeners();
62     }
63
64
65     public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
66             final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
67         Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
68         Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
69         Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
70         return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
71     }
72
73     private void createRpcActors() {
74         LOG.debug("Create rpc registry and broker actors");
75
76         rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
77                 .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
78
79         rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
80                 .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
81
82         final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
83         rpcRegistry.tell(localRouter, self());
84     }
85
86     private void startListeners() {
87         LOG.debug("Registers rpc listeners");
88
89         rpcListener = new RpcListener(rpcRegistry);
90         rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
91
92         rpcServices.registerRpcListener(rpcListener);
93
94         registerRoutedRpcDelegate();
95         announceSupportedRpcs();
96     }
97
98     private void registerRoutedRpcDelegate() {
99         final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
100         final Set<Module> modules = schemaContext.getModules();
101         for (final Module module : modules) {
102             for (final RpcDefinition rpcDefinition : module.getRpcs()) {
103                 if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
104                     LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
105                     rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
106                 }
107             }
108         }
109         rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
110     }
111
112     /**
113      * Add all the locally registered RPCs in the clustered routing table.
114      */
115     private void announceSupportedRpcs() {
116         LOG.debug("Adding all supported rpcs to routing table");
117         final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
118         final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
119         for (final RpcDefinition rpcDef : currentlySupportedRpc) {
120             rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
121         }
122
123         if (!rpcs.isEmpty()) {
124             rpcListener.onRpcAvailable(rpcs);
125         }
126     }
127
128
129     @Override
130     protected void handleReceive(final Object message) throws Exception {
131         if (message instanceof UpdateSchemaContext) {
132             updateSchemaContext((UpdateSchemaContext) message);
133         }
134     }
135
136     private void updateSchemaContext(final UpdateSchemaContext message) {
137         schemaContext = message.getSchemaContext();
138         registerRoutedRpcDelegate();
139         rpcBroker.tell(message, ActorRef.noSender());
140     }
141
142     @Override
143     public SupervisorStrategy supervisorStrategy() {
144         return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
145             LOG.error("An exception happened actor will be resumed", t);
146
147             return SupervisorStrategy.resume();
148         });
149     }
150 }