2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
15 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapperImpl;
16 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
17 import org.opendaylight.controller.sal.core.api.Broker;
18 import org.opendaylight.controller.sal.core.api.Provider;
19 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
20 import org.opendaylight.controller.sal.core.api.model.SchemaService;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import java.util.Collection;
30 * This is the base class which initialize all the actors, listeners and
31 * default RPc implementation so remote invocation of rpcs.
33 public class RemoteRpcProvider implements AutoCloseable, Provider{
35 private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
37 private final ActorSystem actorSystem;
38 private ActorRef rpcBroker;
39 private ActorRef rpcRegistry;
40 private final RpcProvisionRegistry rpcProvisionRegistry;
41 private Broker.ProviderSession brokerSession;
42 private RpcListener rpcListener;
43 private RoutedRpcListener routeChangeListener;
44 private RemoteRpcImplementation rpcImplementation;
45 public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
46 this.actorSystem = actorSystem;
47 this.rpcProvisionRegistry = rpcProvisionRegistry;
51 public void close() throws Exception {
52 this.actorSystem.shutdown();
53 unregisterSupportedRpcs();
54 unregisterSupportedRoutedRpcs();
58 public void onSessionInitiated(Broker.ProviderSession session) {
59 this.brokerSession = session;
64 public Collection<ProviderFunctionality> getProviderFunctionality() {
68 private void start() {
69 LOG.debug("Starting all rpc listeners.");
70 // Create actor to handle and sync routing table in cluster
71 ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
72 rpcRegistry = actorSystem.actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry");
74 // Create actor to invoke and execute rpc
75 SchemaService schemaService = brokerSession.getService(SchemaService.class);
76 SchemaContext schemaContext = schemaService.getGlobalContext();
77 rpcBroker = actorSystem.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker");
78 String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc-broker";
79 rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
80 routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
81 rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
82 brokerSession.addRpcRegistrationListener(rpcListener);
83 rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
84 rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
85 announceSupportedRpcs();
86 announceSupportedRoutedRpcs();
91 * Add all the locally registered RPCs in the clustered routing table
93 private void announceSupportedRpcs(){
94 LOG.debug("Adding all supported rpcs to routing table");
95 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
96 for (QName rpc : currentlySupported) {
97 rpcListener.onRpcImplementationAdded(rpc);
102 * Add all the locally registered Routed RPCs in the clustered routing table
104 private void announceSupportedRoutedRpcs(){
106 //TODO: announce all routed RPCs as well
111 * Un-Register all the supported RPCs from clustered routing table
113 private void unregisterSupportedRpcs(){
114 LOG.debug("removing all supported rpcs to routing table");
115 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
116 for (QName rpc : currentlySupported) {
117 rpcListener.onRpcImplementationRemoved(rpc);
122 * Un-Register all the locally supported Routed RPCs from clustered routing table
124 private void unregisterSupportedRoutedRpcs(){
126 //TODO: remove all routed RPCs as well