Merge "Added DELETE support for Bridge and Port resources"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections
11 import java.util.HashMap
12 import java.util.HashSet
13 import java.util.Map
14 import java.util.Set
15 import java.util.concurrent.Callable
16 import java.util.concurrent.ExecutorService
17 import java.util.concurrent.Executors
18 import java.util.concurrent.Future
19 import org.opendaylight.controller.sal.core.api.Broker
20 import org.opendaylight.controller.sal.core.api.BrokerService
21 import org.opendaylight.controller.sal.core.api.Consumer
22 import org.opendaylight.controller.sal.core.api.Provider
23 import org.opendaylight.controller.sal.core.spi.BrokerModule
24 import org.opendaylight.yangtools.yang.common.QName
25 import org.opendaylight.yangtools.yang.common.RpcResult
26 import org.opendaylight.yangtools.yang.data.api.CompositeNode
27 import org.osgi.framework.BundleContext
28 import org.slf4j.LoggerFactory
29 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
30 import org.opendaylight.yangtools.concepts.ListenerRegistration
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
32
33 public class BrokerImpl implements Broker {
34     private static val log = LoggerFactory.getLogger(BrokerImpl);
35
36     // Broker Generic Context
37     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
38     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
39         new HashSet<ProviderContextImpl>());
40     private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
41     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
42         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
43
44     // Implementation specific
45     @Property
46     private var ExecutorService executor = Executors.newFixedThreadPool(5);
47     @Property
48     private var BundleContext bundleContext;
49     
50     @Property
51     private var RpcRouter router;
52
53     override registerConsumer(Consumer consumer, BundleContext ctx) {
54         checkPredicates(consumer);
55         log.info("Registering consumer " + consumer);
56         val session = newSessionFor(consumer, ctx);
57         consumer.onSessionInitiated(session);
58         sessions.add(session);
59         return session;
60     }
61
62     override registerProvider(Provider provider, BundleContext ctx) {
63         checkPredicates(provider);
64
65         val session = newSessionFor(provider, ctx);
66         provider.onSessionInitiated(session);
67         providerSessions.add(session);
68         return session;
69     }
70
71     public def addModule(BrokerModule module) {
72         log.info("Registering broker module " + module);
73         if(modules.contains(module)) {
74             log.error("Module already registered");
75             throw new IllegalArgumentException("Module already exists.");
76         }
77
78         val provServices = module.getProvidedServices();
79         for (Class<? extends BrokerService> serviceType : provServices) {
80             log.info("  Registering session service implementation: " + serviceType.getCanonicalName());
81             serviceProviders.put(serviceType, module);
82         }
83     }
84
85     public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
86         val prov = serviceProviders.get(service);
87         if(prov == null) {
88             log.warn("Service " + service.toString() + " is not supported");
89             return null;
90         }
91         return prov.getServiceForSession(service, session);
92     }
93
94     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
95         val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
96         return result;
97     }
98
99     // Validation
100     private def void checkPredicates(Provider prov) {
101         if(prov == null)
102             throw new IllegalArgumentException("Provider should not be null.");
103         for (ProviderContextImpl session : providerSessions) {
104             if(prov.equals(session.getProvider()))
105                 throw new IllegalStateException("Provider already registered");
106         }
107
108     }
109
110     private def void checkPredicates(Consumer cons) {
111         if(cons == null)
112             throw new IllegalArgumentException("Consumer should not be null.");
113         for (ConsumerContextImpl session : sessions) {
114             if(cons.equals(session.getConsumer()))
115                 throw new IllegalStateException("Consumer already registered");
116         }
117     }
118
119     // Private Factory methods
120     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
121         val ret = new ConsumerContextImpl(provider, ctx);
122         ret.broker = this;
123         return ret;
124     }
125
126     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
127         val ret = new ProviderContextImpl(provider, ctx);
128         ret.broker = this;
129         return ret;
130     }
131
132     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
133         sessions.remove(consumerContextImpl);
134         providerSessions.remove(consumerContextImpl);
135     }
136 }