Add support for reusable streaming
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11 import akka.actor.ActorRef;
12 import akka.actor.OneForOneStrategy;
13 import akka.actor.Props;
14 import akka.actor.SupervisorStrategy;
15 import com.google.common.base.Preconditions;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
19 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
20 import org.opendaylight.mdsal.dom.api.DOMRpcService;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import scala.concurrent.duration.FiniteDuration;
23
24 /**
25  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
26  * {@link RpcListener} with the local {@link DOMRpcService}.
27  */
28 public class RpcManager extends AbstractUntypedActor {
29     private final DOMRpcProviderService rpcProvisionRegistry;
30     private final RemoteRpcProviderConfig config;
31     private final DOMRpcService rpcServices;
32
33     private ListenerRegistration<RpcListener> listenerReg;
34     private ActorRef rpcInvoker;
35     private ActorRef rpcRegistry;
36     private ActorRef rpcRegistrar;
37
38     RpcManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
39             final RemoteRpcProviderConfig config) {
40         this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
41         this.rpcServices = Preconditions.checkNotNull(rpcServices);
42         this.config = Preconditions.checkNotNull(config);
43     }
44
45     public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
46             final RemoteRpcProviderConfig config) {
47         Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
48         Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
49         Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
50         return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
51     }
52
53     @Override
54     public void preStart() throws Exception {
55         super.preStart();
56
57         rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
58             .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
59         LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
60
61         rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
62             .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
63         LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
64
65         rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
66                 .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
67         LOG.debug("Propagating RPC information with {}", rpcRegistry);
68
69         final RpcListener rpcListener = new RpcListener(rpcRegistry);
70         LOG.debug("Registering local availabitility listener {}", rpcListener);
71         listenerReg = rpcServices.registerRpcListener(rpcListener);
72     }
73
74     @Override
75     public void postStop() throws Exception {
76         if (listenerReg != null) {
77             listenerReg.close();
78             listenerReg = null;
79         }
80
81         super.postStop();
82     }
83
84     @Override
85     protected void handleReceive(final Object message) {
86         unknownMessage(message);
87     }
88
89     @Override
90     public SupervisorStrategy supervisorStrategy() {
91         return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
92             LOG.error("An exception happened actor will be resumed", t);
93             return SupervisorStrategy.resume();
94         });
95     }
96 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.