2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.core.api.Broker;
20 import org.opendaylight.controller.sal.core.api.BrokerService;
21 import org.opendaylight.controller.sal.core.api.Consumer;
22 import org.opendaylight.controller.sal.core.api.Provider;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.osgi.framework.BundleContext;
29 import org.slf4j.LoggerFactory;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
32 import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
34 public class BrokerImpl implements Broker {
35 private static val log = LoggerFactory.getLogger(BrokerImpl);
37 // Broker Generic Context
38 private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
39 private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
40 new HashSet<ProviderContextImpl>());
41 private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
42 private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
43 synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
46 private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
48 private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
49 new HashMap<QName, RpcImplementation>());
51 // Implementation specific
53 private var ExecutorService executor = Executors.newFixedThreadPool(5);
55 private var BundleContext bundleContext;
57 override registerConsumer(Consumer consumer, BundleContext ctx) {
58 checkPredicates(consumer);
59 log.info("Registering consumer " + consumer);
60 val session = newSessionFor(consumer, ctx);
61 consumer.onSessionInitiated(session);
62 sessions.add(session);
66 override registerProvider(Provider provider, BundleContext ctx) {
67 checkPredicates(provider);
69 val session = newSessionFor(provider, ctx);
70 provider.onSessionInitiated(session);
71 providerSessions.add(session);
75 public def addModule(BrokerModule module) {
76 log.info("Registering broker module " + module);
77 if(modules.contains(module)) {
78 log.error("Module already registered");
79 throw new IllegalArgumentException("Module already exists.");
82 val provServices = module.getProvidedServices();
83 for (Class<? extends BrokerService> serviceType : provServices) {
84 log.info(" Registering session service implementation: " + serviceType.getCanonicalName());
85 serviceProviders.put(serviceType, module);
89 public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
90 val prov = serviceProviders.get(service);
92 log.warn("Service " + service.toString() + " is not supported");
95 return prov.getServiceForSession(service, session);
99 protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
100 if(rpcImpls.get(rpcType) != null) {
101 throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
105 rpcImpls.put(rpcType, implementation);
108 for(listener : rpcRegistrationListeners.listeners) {
110 listener.instance.onRpcImplementationAdded(rpcType);
111 } catch (Exception e){
112 log.error("Unhandled exception during invoking listener",e);
117 protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
118 if(implToRemove == rpcImpls.get(rpcType)) {
119 rpcImpls.remove(rpcType);
122 for(listener : rpcRegistrationListeners.listeners) {
124 listener.instance.onRpcImplementationRemoved(rpcType);
125 } catch (Exception e){
126 log.error("Unhandled exception during invoking listener",e);
131 protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
132 val impl = rpcImpls.get(rpc);
133 val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
138 private def void checkPredicates(Provider prov) {
140 throw new IllegalArgumentException("Provider should not be null.");
141 for (ProviderContextImpl session : providerSessions) {
142 if(prov.equals(session.getProvider()))
143 throw new IllegalStateException("Provider already registered");
148 private def void checkPredicates(Consumer cons) {
150 throw new IllegalArgumentException("Consumer should not be null.");
151 for (ConsumerContextImpl session : sessions) {
152 if(cons.equals(session.getConsumer()))
153 throw new IllegalStateException("Consumer already registered");
157 // Private Factory methods
158 private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
159 val ret = new ConsumerContextImpl(provider, ctx);
164 private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
165 val ret = new ProviderContextImpl(provider, ctx);
170 protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
171 sessions.remove(consumerContextImpl);
172 providerSessions.remove(consumerContextImpl);
175 protected def getSupportedRpcs() {
179 def ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
180 rpcRegistrationListeners.register(listener);