Migrated dom.BrokerImpl to xtend code.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import org.opendaylight.controller.sal.core.api.Broker;
19 import org.opendaylight.controller.sal.core.api.BrokerService;
20 import org.opendaylight.controller.sal.core.api.Consumer;
21 import org.opendaylight.controller.sal.core.api.Provider;
22 import org.opendaylight.controller.sal.core.api.RpcImplementation;
23 import org.opendaylight.controller.sal.core.spi.BrokerModule;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import org.opendaylight.yangtools.yang.common.RpcResult;
26 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
27 import org.osgi.framework.BundleContext;
28 import org.slf4j.LoggerFactory;
29
30 public class BrokerImpl implements Broker {
31     private static val log = LoggerFactory.getLogger(BrokerImpl);
32
33     // Broker Generic Context
34     private val Set<ConsumerContextImpl> sessions = Collections.synchronizedSet(new HashSet<ConsumerContextImpl>());
35     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
36         new HashSet<ProviderContextImpl>());
37     private val Set<BrokerModule> modules = Collections.synchronizedSet(new HashSet<BrokerModule>());
38     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
39         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
40
41     // RPC Context
42     private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
43         new HashMap<QName, RpcImplementation>());
44
45     // Implementation specific
46     @Property
47     private var ExecutorService executor;
48     @Property
49     private var BundleContext bundleContext;
50
51     override registerConsumer(Consumer consumer, BundleContext ctx) {
52         checkPredicates(consumer);
53         log.info("Registering consumer " + consumer);
54         val session = newSessionFor(consumer, ctx);
55         consumer.onSessionInitiated(session);
56         sessions.add(session);
57         return session;
58     }
59
60     override registerProvider(Provider provider, BundleContext ctx) {
61         checkPredicates(provider);
62
63         val session = newSessionFor(provider, ctx);
64         provider.onSessionInitiated(session);
65         providerSessions.add(session);
66         return session;
67     }
68
69     public def addModule(BrokerModule module) {
70         log.info("Registering broker module " + module);
71         if(modules.contains(module)) {
72             log.error("Module already registered");
73             throw new IllegalArgumentException("Module already exists.");
74         }
75
76         val provServices = module.getProvidedServices();
77         for (Class<? extends BrokerService> serviceType : provServices) {
78             log.info("  Registering session service implementation: " + serviceType.getCanonicalName());
79             serviceProviders.put(serviceType, module);
80         }
81     }
82
83     public def <T extends BrokerService> T serviceFor(Class<T> service, ConsumerContextImpl session) {
84         val prov = serviceProviders.get(service);
85         if(prov == null) {
86             log.warn("Service " + service.toString() + " is not supported");
87             return null;
88         }
89         return prov.getServiceForSession(service, session);
90     }
91
92     // RPC Functionality
93     protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
94         if(rpcImpls.get(rpcType) != null) {
95             throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
96         }
97
98         //TODO: Add notification for availability of Rpc Implementation
99         rpcImpls.put(rpcType, implementation);
100     }
101
102     protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
103         if(implToRemove == rpcImpls.get(rpcType)) {
104             rpcImpls.remove(rpcType);
105         }
106     }
107
108     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
109         val impl = rpcImpls.get(rpc);
110         val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
111         return result;
112     }
113
114     // Validation
115     private def void checkPredicates(Provider prov) {
116         if(prov == null)
117             throw new IllegalArgumentException("Provider should not be null.");
118         for (ProviderContextImpl session : providerSessions) {
119             if(prov.equals(session.getProvider()))
120                 throw new IllegalStateException("Provider already registered");
121         }
122
123     }
124
125     private def void checkPredicates(Consumer cons) {
126         if(cons == null)
127             throw new IllegalArgumentException("Consumer should not be null.");
128         for (ConsumerContextImpl session : sessions) {
129             if(cons.equals(session.getConsumer()))
130                 throw new IllegalStateException("Consumer already registered");
131         }
132     }
133
134     // Private Factory methods
135     private def ConsumerContextImpl newSessionFor(Consumer provider, BundleContext ctx) {
136         val ret = new ConsumerContextImpl(provider, ctx);
137         ret.broker = this;
138         return ret;
139     }
140
141     private def ProviderContextImpl newSessionFor(Provider provider, BundleContext ctx) {
142         val ret = new ProviderContextImpl(provider, ctx);
143         ret.broker = this;
144         return ret;
145     }
146
147     protected def void consumerSessionClosed(ConsumerContextImpl consumerContextImpl) {
148         sessions.remove(consumerContextImpl);
149         providerSessions.remove(consumerContextImpl);
150     }
151 }