Merge "Bug 383 - exception raised if key part of URI is incorrect"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections
11 import java.util.HashSet
12 import java.util.Set
13 import java.util.concurrent.Callable
14 import java.util.concurrent.ExecutorService
15 import java.util.concurrent.Executors
16 import java.util.concurrent.Future
17 import org.opendaylight.controller.sal.core.api.Broker
18 import org.opendaylight.controller.sal.core.api.Consumer
19 import org.opendaylight.controller.sal.core.api.Provider
20 import org.opendaylight.yangtools.yang.common.QName
21 import org.opendaylight.yangtools.yang.common.RpcResult
22 import org.opendaylight.yangtools.yang.data.api.CompositeNode
23 import org.osgi.framework.BundleContext
24 import org.slf4j.LoggerFactory
25 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
26 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
27 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
28 import org.opendaylight.controller.sal.core.api.RpcImplementation
29 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
30 import org.opendaylight.controller.sal.core.api.RpcRoutingContext
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
32 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
33
34 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
35     private static val log = LoggerFactory.getLogger(BrokerImpl);
36
37     // Broker Generic Context
38     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
39     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
40         new HashSet<ProviderContextImpl>());
41
42     // Implementation specific
43     @Property
44     private var ExecutorService executor = Executors.newFixedThreadPool(5);
45     @Property
46     private var BundleContext bundleContext;
47     
48     @Property
49     private var AutoCloseable deactivator;
50
51     @Property
52     private var RpcRouter router;
53
54     override registerConsumer(Consumer consumer, BundleContext ctx) {
55         checkPredicates(consumer);
56         log.trace("Registering consumer " + consumer);
57         val session = newSessionFor(consumer, ctx);
58         consumer.onSessionInitiated(session);
59         sessions.add(session);
60         return session;
61     }
62
63     override registerProvider(Provider provider, BundleContext ctx) {
64         checkPredicates(provider);
65
66         val session = newSessionFor(provider, ctx);
67         provider.onSessionInitiated(session);
68         providerSessions.add(session);
69         return session;
70     }
71
72     protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
73         val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
74         return result;
75     }
76
77     // Validation
78     private def void checkPredicates(Provider prov) {
79         if (prov == null)
80             throw new IllegalArgumentException("Provider should not be null.");
81         for (ProviderContextImpl session : providerSessions) {
82             if (prov.equals(session.getProvider()))
83                 throw new IllegalStateException("Provider already registered");
84         }
85
86     }
87
88     private def void checkPredicates(Consumer cons) {
89         if (cons == null)
90             throw new IllegalArgumentException("Consumer should not be null.");
91         for (ConsumerContextImpl session : sessions) {
92             if (cons.equals(session.getConsumer()))
93                 throw new IllegalStateException("Consumer already registered");
94         }
95     }
96
97     // Private Factory methods
98     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
99         val ret = new ConsumerContextImpl(provider, ctx);
100         ret.broker = this;
101         return ret;
102     }
103
104     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
105         val ret = new ProviderContextImpl(provider, ctx);
106         ret.broker = this;
107         return ret;
108     }
109
110     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
111         sessions.remove(consumerContextImpl);
112         providerSessions.remove(consumerContextImpl);
113     }
114     
115     override close() throws Exception {
116         deactivator?.close();
117     }
118     
119     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
120         router.addRpcImplementation(rpcType,implementation);
121     }
122     
123     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
124         router.addRoutedRpcImplementation(rpcType,implementation);
125     }
126
127     override setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
128         router.setRoutedRpcDefaultDelegate(defaultImplementation);
129     }
130
131     override addRpcRegistrationListener(RpcRegistrationListener listener) {
132         return router.addRpcRegistrationListener(listener);
133     }
134     
135     override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
136         return router.registerRouteChangeListener(listener);
137     }
138
139     override invokeRpc(QName rpc,CompositeNode input){
140         return router.invokeRpc(rpc,input)
141     }
142
143     override getSupportedRpcs() {
144         return router.getSupportedRpcs();
145     }
146     
147 }