Added Inventory Reader for SwitchManager
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import org.opendaylight.controller.sal.core.api.Broker;
20 import org.opendaylight.controller.sal.core.api.BrokerService;
21 import org.opendaylight.controller.sal.core.api.Consumer;
22 import org.opendaylight.controller.sal.core.api.Provider;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.spi.BrokerModule;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.osgi.framework.BundleContext;
29 import org.slf4j.LoggerFactory;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
32 import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
33
34 public class BrokerImpl implements Broker {
35     private static val log = LoggerFactory.getLogger(BrokerImpl);
36
37     // Broker Generic Context
38     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
39     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
40         new HashSet<ProviderContextImpl>());
41     private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
42     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
43         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
44
45
46     private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
47     // RPC Context
48     private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
49         new HashMap<QName, RpcImplementation>());
50
51     // Implementation specific
52     @Property
53     private var ExecutorService executor = Executors.newFixedThreadPool(5);
54     @Property
55     private var BundleContext bundleContext;
56
57     override registerConsumer(Consumer consumer, BundleContext ctx) {
58         checkPredicates(consumer);
59         log.info("Registering consumer " + consumer);
60         val session = newSessionFor(consumer, ctx);
61         consumer.onSessionInitiated(session);
62         sessions.add(session);
63         return session;
64     }
65
66     override registerProvider(Provider provider, BundleContext ctx) {
67         checkPredicates(provider);
68
69         val session = newSessionFor(provider, ctx);
70         provider.onSessionInitiated(session);
71         providerSessions.add(session);
72         return session;
73     }
74
75     public def addModule(BrokerModule module) {
76         log.info("Registering broker module " + module);
77         if(modules.contains(module)) {
78             log.error("Module already registered");
79             throw new IllegalArgumentException("Module already exists.");
80         }
81
82         val provServices = module.getProvidedServices();
83         for (Class<? extends BrokerService> serviceType : provServices) {
84             log.info("  Registering session service implementation: " + serviceType.getCanonicalName());
85             serviceProviders.put(serviceType, module);
86         }
87     }
88
89     public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
90         val prov = serviceProviders.get(service);
91         if(prov == null) {
92             log.warn("Service " + service.toString() + " is not supported");
93             return null;
94         }
95         return prov.getServiceForSession(service, session);
96     }
97
98     // RPC Functionality
99     protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
100         if(rpcImpls.get(rpcType) != null) {
101             throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
102         }
103
104         
105         rpcImpls.put(rpcType, implementation);
106
107         
108         for(listener : rpcRegistrationListeners.listeners)  {
109             try {
110                 listener.instance.onRpcImplementationAdded(rpcType);
111             } catch (Exception e){
112                 log.error("Unhandled exception during invoking listener",e);
113             }
114         }
115     }
116
117     protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
118         if(implToRemove == rpcImpls.get(rpcType)) {
119             rpcImpls.remove(rpcType);
120         }
121         
122         for(listener : rpcRegistrationListeners.listeners)  {
123             try {
124                 listener.instance.onRpcImplementationRemoved(rpcType);
125             } catch (Exception e){
126                 log.error("Unhandled exception during invoking listener",e);
127             }
128         }
129     }
130
131     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
132         val impl = rpcImpls.get(rpc);
133         val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
134         return result;
135     }
136
137     // Validation
138     private def void checkPredicates(Provider prov) {
139         if(prov == null)
140             throw new IllegalArgumentException("Provider should not be null.");
141         for (ProviderContextImpl session : providerSessions) {
142             if(prov.equals(session.getProvider()))
143                 throw new IllegalStateException("Provider already registered");
144         }
145
146     }
147
148     private def void checkPredicates(Consumer cons) {
149         if(cons == null)
150             throw new IllegalArgumentException("Consumer should not be null.");
151         for (ConsumerContextImpl session : sessions) {
152             if(cons.equals(session.getConsumer()))
153                 throw new IllegalStateException("Consumer already registered");
154         }
155     }
156
157     // Private Factory methods
158     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
159         val ret = new ConsumerContextImpl(provider, ctx);
160         ret.broker = this;
161         return ret;
162     }
163
164     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
165         val ret = new ProviderContextImpl(provider, ctx);
166         ret.broker = this;
167         return ret;
168     }
169
170     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
171         sessions.remove(consumerContextImpl);
172         providerSessions.remove(consumerContextImpl);
173     }
174     
175     protected def getSupportedRpcs() {
176         rpcImpls.keySet;
177     }
178     
179     def ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
180         rpcRegistrationListeners.register(listener);
181     }
182 }