Proper unregistration of provider's session
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections
11 import java.util.HashMap
12 import java.util.HashSet
13 import java.util.Map
14 import java.util.Set
15 import java.util.concurrent.Callable
16 import java.util.concurrent.ExecutorService
17 import java.util.concurrent.Executors
18 import java.util.concurrent.Future
19 import org.opendaylight.controller.sal.core.api.Broker
20 import org.opendaylight.controller.sal.core.api.BrokerService
21 import org.opendaylight.controller.sal.core.api.Consumer
22 import org.opendaylight.controller.sal.core.api.Provider
23 import org.opendaylight.controller.sal.core.api.RpcImplementation
24 import org.opendaylight.controller.sal.core.spi.BrokerModule
25 import org.opendaylight.yangtools.yang.common.QName
26 import org.opendaylight.yangtools.yang.common.RpcResult
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode
28 import org.osgi.framework.BundleContext
29 import org.slf4j.LoggerFactory
30
31 public class BrokerImpl implements Broker {
32     private static val log = LoggerFactory.getLogger(BrokerImpl);
33
34     // Broker Generic Context
35     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
36     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
37         new HashSet<ProviderContextImpl>());
38     private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
39     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
40         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
41
42     // RPC Context
43     private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
44         new HashMap<QName, RpcImplementation>());
45
46     // Implementation specific
47     @Property
48     private var ExecutorService executor = Executors.newFixedThreadPool(5);
49     @Property
50     private var BundleContext bundleContext;
51
52     override registerConsumer(Consumer consumer, BundleContext ctx) {
53         checkPredicates(consumer);
54         log.info("Registering consumer " + consumer);
55         val session = newSessionFor(consumer, ctx);
56         consumer.onSessionInitiated(session);
57         sessions.add(session);
58         return session;
59     }
60
61     override registerProvider(Provider provider, BundleContext ctx) {
62         checkPredicates(provider);
63
64         val session = newSessionFor(provider, ctx);
65         provider.onSessionInitiated(session);
66         providerSessions.add(session);
67         return session;
68     }
69
70     public def addModule(BrokerModule module) {
71         log.info("Registering broker module " + module);
72         if(modules.contains(module)) {
73             log.error("Module already registered");
74             throw new IllegalArgumentException("Module already exists.");
75         }
76
77         val provServices = module.getProvidedServices();
78         for (Class<? extends BrokerService> serviceType : provServices) {
79             log.info("  Registering session service implementation: " + serviceType.getCanonicalName());
80             serviceProviders.put(serviceType, module);
81         }
82     }
83
84     public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
85         val prov = serviceProviders.get(service);
86         if(prov == null) {
87             log.warn("Service " + service.toString() + " is not supported");
88             return null;
89         }
90         return prov.getServiceForSession(service, session);
91     }
92
93     // RPC Functionality
94     protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
95         if(rpcImpls.get(rpcType) != null) {
96             throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
97         }
98
99         //TODO: Add notification for availability of Rpc Implementation
100         rpcImpls.put(rpcType, implementation);
101     }
102
103     protected def void removeRpcImplementation(QName rpcType) {
104         rpcImpls.remove(rpcType);
105     }
106
107     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
108         val impl = rpcImpls.get(rpc);
109         val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
110         return result;
111     }
112
113     // Validation
114     private def void checkPredicates(Provider prov) {
115         if(prov == null)
116             throw new IllegalArgumentException("Provider should not be null.");
117         for (ProviderContextImpl session : providerSessions) {
118             if(prov.equals(session.getProvider()))
119                 throw new IllegalStateException("Provider already registered");
120         }
121
122     }
123
124     private def void checkPredicates(Consumer cons) {
125         if(cons == null)
126             throw new IllegalArgumentException("Consumer should not be null.");
127         for (ConsumerContextImpl session : sessions) {
128             if(cons.equals(session.getConsumer()))
129                 throw new IllegalStateException("Consumer already registered");
130         }
131     }
132
133     // Private Factory methods
134     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
135         val ret = new ConsumerContextImpl(provider, ctx);
136         ret.broker = this;
137         return ret;
138     }
139
140     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
141         val ret = new ProviderContextImpl(provider, ctx);
142         ret.broker = this;
143         return ret;
144     }
145
146     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
147         sessions.remove(consumerContextImpl);
148         providerSessions.remove(consumerContextImpl);
149     }
150 }