2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import org.opendaylight.controller.sal.core.api.Broker;
19 import org.opendaylight.controller.sal.core.api.BrokerService;
20 import org.opendaylight.controller.sal.core.api.Consumer;
21 import org.opendaylight.controller.sal.core.api.Provider;
22 import org.opendaylight.controller.sal.core.api.RpcImplementation;
23 import org.opendaylight.controller.sal.core.spi.BrokerModule;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import org.opendaylight.yangtools.yang.common.RpcResult;
26 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
27 import org.osgi.framework.BundleContext;
28 import org.slf4j.LoggerFactory;
30 public class BrokerImpl implements Broker {
31 private static val log = LoggerFactory.getLogger(BrokerImpl);
33 // Broker Generic Context
34 private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
35 private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
36 new HashSet<ProviderContextImpl>());
37 private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
38 private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
39 synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
42 private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
43 new HashMap<QName, RpcImplementation>());
45 // Implementation specific
47 private var ExecutorService executor;
49 private var BundleContext bundleContext;
51 override registerConsumer(Consumer consumer, BundleContext ctx) {
52 checkPredicates(consumer);
53 log.info("Registering consumer " + consumer);
54 val session = newSessionFor(consumer, ctx);
55 consumer.onSessionInitiated(session);
56 sessions.add(session);
60 override registerProvider(Provider provider, BundleContext ctx) {
61 checkPredicates(provider);
63 val session = newSessionFor(provider, ctx);
64 provider.onSessionInitiated(session);
65 providerSessions.add(session);
69 public def addModule(BrokerModule module) {
70 log.info("Registering broker module " + module);
71 if(modules.contains(module)) {
72 log.error("Module already registered");
73 throw new IllegalArgumentException("Module already exists.");
76 val provServices = module.getProvidedServices();
77 for (Class<? extends BrokerService> serviceType : provServices) {
78 log.info(" Registering session service implementation: " + serviceType.getCanonicalName());
79 serviceProviders.put(serviceType, module);
83 public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
84 val prov = serviceProviders.get(service);
86 log.warn("Service " + service.toString() + " is not supported");
89 return prov.getServiceForSession(service, session);
93 protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
94 if(rpcImpls.get(rpcType) != null) {
95 throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
98 //TODO: Add notification for availability of Rpc Implementation
99 rpcImpls.put(rpcType, implementation);
102 protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
103 if(implToRemove == rpcImpls.get(rpcType)) {
104 rpcImpls.remove(rpcType);
108 protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
109 val impl = rpcImpls.get(rpc);
110 val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
115 private def void checkPredicates(Provider prov) {
117 throw new IllegalArgumentException("Provider should not be null.");
118 for (ProviderContextImpl session : providerSessions) {
119 if(prov.equals(session.getProvider()))
120 throw new IllegalStateException("Provider already registered");
125 private def void checkPredicates(Consumer cons) {
127 throw new IllegalArgumentException("Consumer should not be null.");
128 for (ConsumerContextImpl session : sessions) {
129 if(cons.equals(session.getConsumer()))
130 throw new IllegalStateException("Consumer already registered");
134 // Private Factory methods
135 private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
136 val ret = new ConsumerContextImpl(provider, ctx);
141 private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
142 val ret = new ProviderContextImpl(provider, ctx);
147 protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
148 sessions.remove(consumerContextImpl);
149 providerSessions.remove(consumerContextImpl);