2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker;
10 import com.google.common.util.concurrent.ListenableFuture
11 import java.util.Collections
12 import java.util.HashSet
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
15 import org.opendaylight.controller.sal.core.api.Broker
16 import org.opendaylight.controller.sal.core.api.Consumer
17 import org.opendaylight.controller.sal.core.api.Provider
18 import org.opendaylight.yangtools.yang.common.QName
19 import org.opendaylight.yangtools.yang.common.RpcResult
20 import org.opendaylight.yangtools.yang.data.api.CompositeNode
21 import org.osgi.framework.BundleContext
22 import org.slf4j.LoggerFactory
23 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
24 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
25 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
26 import org.opendaylight.controller.sal.core.api.RpcImplementation
27 import org.opendaylight.controller.sal.core.api.RpcRoutingContext
28 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
29 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
31 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
32 private static val log = LoggerFactory.getLogger(BrokerImpl);
34 // Broker Generic Context
35 private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
36 private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
37 new HashSet<ProviderContextImpl>());
40 private var BundleContext bundleContext;
43 private var AutoCloseable deactivator;
46 private var RpcRouter router;
48 override registerConsumer(Consumer consumer, BundleContext ctx) {
49 checkPredicates(consumer);
50 log.trace("Registering consumer " + consumer);
51 val session = newSessionFor(consumer, ctx);
52 consumer.onSessionInitiated(session);
53 sessions.add(session);
57 override registerProvider(Provider provider, BundleContext ctx) {
58 checkPredicates(provider);
60 val session = newSessionFor(provider, ctx);
61 provider.onSessionInitiated(session);
62 providerSessions.add(session);
66 protected def ListenableFuture<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
67 return router.invokeRpc(rpc, input);
71 private def void checkPredicates(Provider prov) {
73 throw new IllegalArgumentException("Provider should not be null.");
74 for (ProviderContextImpl session : providerSessions) {
75 if (prov.equals(session.getProvider()))
76 throw new IllegalStateException("Provider already registered");
81 private def void checkPredicates(Consumer cons) {
83 throw new IllegalArgumentException("Consumer should not be null.");
84 for (ConsumerContextImpl session : sessions) {
85 if (cons.equals(session.getConsumer()))
86 throw new IllegalStateException("Consumer already registered");
90 // Private Factory methods
91 private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
92 val ret = new ConsumerContextImpl(provider, ctx);
97 private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
98 val ret = new ProviderContextImpl(provider, ctx);
103 protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
104 sessions.remove(consumerContextImpl);
105 providerSessions.remove(consumerContextImpl);
108 override close() throws Exception {
109 deactivator?.close();
112 override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
113 router.addRpcImplementation(rpcType,implementation);
116 override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
117 router.addRoutedRpcImplementation(rpcType,implementation);
120 override setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
121 router.setRoutedRpcDefaultDelegate(defaultImplementation);
124 override addRpcRegistrationListener(RpcRegistrationListener listener) {
125 return router.addRpcRegistrationListener(listener);
128 override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
129 return router.registerRouteChangeListener(listener);
132 override getSupportedRpcs() {
133 return router.getSupportedRpcs();
136 override invokeRpc(QName rpc, CompositeNode input) {
137 return router.invokeRpc(rpc,input)