2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker;
10 import java.util.Collections
11 import java.util.HashMap
12 import java.util.HashSet
15 import java.util.concurrent.Callable
16 import java.util.concurrent.ExecutorService
17 import java.util.concurrent.Executors
18 import java.util.concurrent.Future
19 import org.opendaylight.controller.sal.core.api.Broker
20 import org.opendaylight.controller.sal.core.api.BrokerService
21 import org.opendaylight.controller.sal.core.api.Consumer
22 import org.opendaylight.controller.sal.core.api.Provider
23 import org.opendaylight.controller.sal.core.spi.BrokerModule
24 import org.opendaylight.yangtools.yang.common.QName
25 import org.opendaylight.yangtools.yang.common.RpcResult
26 import org.opendaylight.yangtools.yang.data.api.CompositeNode
27 import org.osgi.framework.BundleContext
28 import org.slf4j.LoggerFactory
29 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
30 import org.opendaylight.yangtools.concepts.ListenerRegistration
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
32 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
33 import org.opendaylight.controller.sal.core.api.RpcImplementation
35 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
36 private static val log = LoggerFactory.getLogger(BrokerImpl);
38 // Broker Generic Context
39 private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
40 private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
41 new HashSet<ProviderContextImpl>());
43 // Implementation specific
45 private var ExecutorService executor = Executors.newFixedThreadPool(5);
47 private var BundleContext bundleContext;
50 private var AutoCloseable deactivator;
53 private var RpcRouter router;
55 override registerConsumer(Consumer consumer, BundleContext ctx) {
56 checkPredicates(consumer);
57 log.info("Registering consumer " + consumer);
58 val session = newSessionFor(consumer, ctx);
59 consumer.onSessionInitiated(session);
60 sessions.add(session);
64 override registerProvider(Provider provider, BundleContext ctx) {
65 checkPredicates(provider);
67 val session = newSessionFor(provider, ctx);
68 provider.onSessionInitiated(session);
69 providerSessions.add(session);
73 protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
74 val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
79 private def void checkPredicates(Provider prov) {
81 throw new IllegalArgumentException("Provider should not be null.");
82 for (ProviderContextImpl session : providerSessions) {
83 if (prov.equals(session.getProvider()))
84 throw new IllegalStateException("Provider already registered");
89 private def void checkPredicates(Consumer cons) {
91 throw new IllegalArgumentException("Consumer should not be null.");
92 for (ConsumerContextImpl session : sessions) {
93 if (cons.equals(session.getConsumer()))
94 throw new IllegalStateException("Consumer already registered");
98 // Private Factory methods
99 private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
100 val ret = new ConsumerContextImpl(provider, ctx);
105 private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
106 val ret = new ProviderContextImpl(provider, ctx);
111 protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
112 sessions.remove(consumerContextImpl);
113 providerSessions.remove(consumerContextImpl);
116 override close() throws Exception {
117 deactivator?.close();
120 override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
121 router.addRpcImplementation(rpcType,implementation);
124 override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
125 router.addRoutedRpcImplementation(rpcType,implementation);