Teach sal-remoterpc-connector to route actions
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / OpsManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
20 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
21 import org.opendaylight.mdsal.dom.api.DOMActionService;
22 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
23 import org.opendaylight.mdsal.dom.api.DOMRpcService;
24 import org.opendaylight.yangtools.concepts.ListenerRegistration;
25 import scala.concurrent.duration.FiniteDuration;
26
27 /**
28  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
29  * {@link OpsListener} with the local {@link DOMRpcService}.
30  */
31 public class OpsManager extends AbstractUntypedActor {
32     private final DOMRpcProviderService rpcProvisionRegistry;
33     private final RemoteOpsProviderConfig config;
34     private final DOMRpcService rpcServices;
35     private DOMActionProviderService actionProvisionRegistry;
36     private DOMActionService actionService;
37
38     private ListenerRegistration<OpsListener> listenerReg;
39     private ActorRef opsInvoker;
40     private ActorRef actionRegistry;
41     private ActorRef rpcRegistry;
42     private ActorRef opsRegistrar;
43
44     OpsManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
45                final RemoteOpsProviderConfig config, final DOMActionProviderService actionProviderService,
46                final DOMActionService actionService) {
47         this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry);
48         this.rpcServices = requireNonNull(rpcServices);
49         this.config = requireNonNull(config);
50         this.actionProvisionRegistry = requireNonNull(actionProviderService);
51         this.actionService = requireNonNull(actionService);
52     }
53
54     public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
55                               final RemoteOpsProviderConfig config,
56                               final DOMActionProviderService actionProviderService,
57                               final DOMActionService actionService) {
58         requireNonNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
59         requireNonNull(rpcServices, "RpcService can not be null!");
60         requireNonNull(config, "RemoteOpsProviderConfig can not be null!");
61         requireNonNull(actionProviderService, "ActionProviderService can not be null!");
62         requireNonNull(actionService, "ActionService can not be null!");
63         return Props.create(OpsManager.class, rpcProvisionRegistry, rpcServices, config,
64                 actionProviderService, actionService);
65     }
66
67     @Override
68     public void preStart() throws Exception {
69         super.preStart();
70
71         opsInvoker = getContext().actorOf(OpsInvoker.props(rpcServices, actionService)
72                 .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
73         LOG.debug("Listening for RPC invocation requests with {}", opsInvoker);
74
75         opsRegistrar = getContext().actorOf(OpsRegistrar.props(config, rpcProvisionRegistry, actionProvisionRegistry)
76                 .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
77         LOG.debug("Registering remote RPCs with {}", opsRegistrar);
78
79         rpcRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar)
80                 .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
81         LOG.debug("Propagating RPC information with {}", rpcRegistry);
82
83         actionRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar)
84                 .withMailbox(config.getMailBoxName()), config.getActionRegistryName());
85         LOG.debug("Propagating RPC information with {}", actionRegistry);
86
87         final OpsListener opsListener = new OpsListener(rpcRegistry, actionRegistry);
88         LOG.debug("Registering local availability listener {}", opsListener);
89         listenerReg = rpcServices.registerRpcListener(opsListener);
90     }
91
92     @Override
93     public void postStop() throws Exception {
94         if (listenerReg != null) {
95             listenerReg.close();
96             listenerReg = null;
97         }
98
99         super.postStop();
100     }
101
102     @Override
103     protected void handleReceive(final Object message) {
104         unknownMessage(message);
105     }
106
107     @Override
108     public SupervisorStrategy supervisorStrategy() {
109         return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
110             LOG.error("An exception happened actor will be resumed", t);
111             return SupervisorStrategy.resume();
112         });
113     }
114 }