Fixed inappropriate uses of log level INFO
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections
11 import java.util.HashMap
12 import java.util.HashSet
13 import java.util.Map
14 import java.util.Set
15 import java.util.concurrent.Callable
16 import java.util.concurrent.ExecutorService
17 import java.util.concurrent.Executors
18 import java.util.concurrent.Future
19 import org.opendaylight.controller.sal.core.api.Broker
20 import org.opendaylight.controller.sal.core.api.BrokerService
21 import org.opendaylight.controller.sal.core.api.Consumer
22 import org.opendaylight.controller.sal.core.api.Provider
23 import org.opendaylight.controller.sal.core.spi.BrokerModule
24 import org.opendaylight.yangtools.yang.common.QName
25 import org.opendaylight.yangtools.yang.common.RpcResult
26 import org.opendaylight.yangtools.yang.data.api.CompositeNode
27 import org.osgi.framework.BundleContext
28 import org.slf4j.LoggerFactory
29 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
30 import org.opendaylight.yangtools.concepts.ListenerRegistration
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
32 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
33 import org.opendaylight.controller.sal.core.api.RpcImplementation
34 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
35 import org.opendaylight.controller.sal.core.api.RpcRoutingContext
36 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
37
38 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
39     private static val log = LoggerFactory.getLogger(BrokerImpl);
40
41     // Broker Generic Context
42     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
43     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
44         new HashSet<ProviderContextImpl>());
45
46     // Implementation specific
47     @Property
48     private var ExecutorService executor = Executors.newFixedThreadPool(5);
49     @Property
50     private var BundleContext bundleContext;
51     
52     @Property
53     private var AutoCloseable deactivator;
54
55     @Property
56     private var RpcRouter router;
57
58     override registerConsumer(Consumer consumer, BundleContext ctx) {
59         checkPredicates(consumer);
60         log.trace("Registering consumer " + consumer);
61         val session = newSessionFor(consumer, ctx);
62         consumer.onSessionInitiated(session);
63         sessions.add(session);
64         return session;
65     }
66
67     override registerProvider(Provider provider, BundleContext ctx) {
68         checkPredicates(provider);
69
70         val session = newSessionFor(provider, ctx);
71         provider.onSessionInitiated(session);
72         providerSessions.add(session);
73         return session;
74     }
75
76     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
77         val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
78         return result;
79     }
80
81     // Validation
82     private def void checkPredicates(Provider prov) {
83         if (prov == null)
84             throw new IllegalArgumentException("Provider should not be null.");
85         for (ProviderContextImpl session : providerSessions) {
86             if (prov.equals(session.getProvider()))
87                 throw new IllegalStateException("Provider already registered");
88         }
89
90     }
91
92     private def void checkPredicates(Consumer cons) {
93         if (cons == null)
94             throw new IllegalArgumentException("Consumer should not be null.");
95         for (ConsumerContextImpl session : sessions) {
96             if (cons.equals(session.getConsumer()))
97                 throw new IllegalStateException("Consumer already registered");
98         }
99     }
100
101     // Private Factory methods
102     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
103         val ret = new ConsumerContextImpl(provider, ctx);
104         ret.broker = this;
105         return ret;
106     }
107
108     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
109         val ret = new ProviderContextImpl(provider, ctx);
110         ret.broker = this;
111         return ret;
112     }
113
114     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
115         sessions.remove(consumerContextImpl);
116         providerSessions.remove(consumerContextImpl);
117     }
118     
119     override close() throws Exception {
120         deactivator?.close();
121     }
122     
123     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
124         router.addRpcImplementation(rpcType,implementation);
125     }
126     
127     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
128         router.addRoutedRpcImplementation(rpcType,implementation);
129     }
130     
131     override addRpcRegistrationListener(RpcRegistrationListener listener) {
132         return router.addRpcRegistrationListener(listener);
133     }
134     
135     override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
136         return router.registerRouteChangeListener(listener);
137     }
138     
139 }