Merge "BUG-421: Define multipart-transaction-aware"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections
11 import java.util.HashSet
12 import java.util.Set
13 import java.util.concurrent.Callable
14 import java.util.concurrent.ExecutorService
15 import java.util.concurrent.Executors
16 import java.util.concurrent.Future
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
18 import org.opendaylight.controller.sal.core.api.Broker
19 import org.opendaylight.controller.sal.core.api.Consumer
20 import org.opendaylight.controller.sal.core.api.Provider
21 import org.opendaylight.controller.sal.core.api.RpcImplementation
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
23 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
24 import org.opendaylight.controller.sal.core.api.RpcRoutingContext
25 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
26 import org.opendaylight.yangtools.yang.common.QName
27 import org.opendaylight.yangtools.yang.common.RpcResult
28 import org.opendaylight.yangtools.yang.data.api.CompositeNode
29 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
30 import org.osgi.framework.BundleContext
31 import org.slf4j.LoggerFactory
32
33 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
34     private static val log = LoggerFactory.getLogger(BrokerImpl);
35
36     // Broker Generic Context
37     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
38     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
39         new HashSet<ProviderContextImpl>());
40
41     // Implementation specific
42     @Property
43     private var ExecutorService executor = Executors.newFixedThreadPool(5);
44     @Property
45     private var BundleContext bundleContext;
46     
47     @Property
48     private var AutoCloseable deactivator;
49
50     @Property
51     private var RpcRouter router;
52
53     override registerConsumer(Consumer consumer, BundleContext ctx) {
54         checkPredicates(consumer);
55         log.trace("Registering consumer " + consumer);
56         val session = newSessionFor(consumer, ctx);
57         consumer.onSessionInitiated(session);
58         sessions.add(session);
59         return session;
60     }
61
62     override registerProvider(Provider provider, BundleContext ctx) {
63         checkPredicates(provider);
64
65         val session = newSessionFor(provider, ctx);
66         provider.onSessionInitiated(session);
67         providerSessions.add(session);
68         return session;
69     }
70
71     protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
72         val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
73         return result;
74     }
75
76     // Validation
77     private def void checkPredicates(Provider prov) {
78         if (prov == null)
79             throw new IllegalArgumentException("Provider should not be null.");
80         for (ProviderContextImpl session : providerSessions) {
81             if (prov.equals(session.getProvider()))
82                 throw new IllegalStateException("Provider already registered");
83         }
84
85     }
86
87     private def void checkPredicates(Consumer cons) {
88         if (cons == null)
89             throw new IllegalArgumentException("Consumer should not be null.");
90         for (ConsumerContextImpl session : sessions) {
91             if (cons.equals(session.getConsumer()))
92                 throw new IllegalStateException("Consumer already registered");
93         }
94     }
95
96     // Private Factory methods
97     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
98         val ret = new ConsumerContextImpl(provider, ctx);
99         ret.broker = this;
100         return ret;
101     }
102
103     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
104         val ret = new ProviderContextImpl(provider, ctx);
105         ret.broker = this;
106         return ret;
107     }
108
109     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
110         sessions.remove(consumerContextImpl);
111         providerSessions.remove(consumerContextImpl);
112     }
113     
114     override close() throws Exception {
115         deactivator?.close();
116     }
117     
118     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
119         router.addRpcImplementation(rpcType,implementation);
120     }
121     
122     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
123         router.addRoutedRpcImplementation(rpcType,implementation);
124     }
125     
126     override addRpcRegistrationListener(RpcRegistrationListener listener) {
127         return router.addRpcRegistrationListener(listener);
128     }
129     
130     override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
131         return router.registerRouteChangeListener(listener);
132     }
133
134     override invokeRpc(QName rpc,CompositeNode input){
135         return router.invokeRpc(rpc,input)
136     }
137
138     override getSupportedRpcs() {
139         return router.getSupportedRpcs();
140     }
141     
142 }