Merge "Complete implementation of DataChangeListenerProxy"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker;
9
10 import java.util.Collections;
11 import java.util.HashSet;
12 import java.util.Set;
13 import java.util.concurrent.Future;
14
15 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
16 import org.opendaylight.controller.sal.core.api.Broker;
17 import org.opendaylight.controller.sal.core.api.BrokerService;
18 import org.opendaylight.controller.sal.core.api.Consumer;
19 import org.opendaylight.controller.sal.core.api.Provider;
20 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
21 import org.opendaylight.controller.sal.core.api.RpcImplementation;
22 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
23 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
24 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
25 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.opendaylight.yangtools.yang.common.QName;
28 import org.opendaylight.yangtools.yang.common.RpcResult;
29 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.osgi.framework.BundleContext;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import com.google.common.base.Optional;
36 import com.google.common.base.Preconditions;
37 import com.google.common.collect.ClassToInstanceMap;
38 import com.google.common.collect.ImmutableClassToInstanceMap;
39 import com.google.common.util.concurrent.ListenableFuture;
40
41 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
42     private final static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
43
44     // Broker Generic Context
45     private final Set<ConsumerContextImpl> sessions = Collections
46             .synchronizedSet(new HashSet<ConsumerContextImpl>());
47     private final Set<ProviderContextImpl> providerSessions = Collections
48             .synchronizedSet(new HashSet<ProviderContextImpl>());
49
50     private AutoCloseable deactivator = null;
51
52     private RpcRouter router = null;
53
54     private final ClassToInstanceMap<BrokerService> services;
55
56     public  BrokerImpl(final RpcRouter router,final ClassToInstanceMap<BrokerService> services) {
57         this.router = Preconditions.checkNotNull(router, "RPC Router must not be null");
58         this.services = ImmutableClassToInstanceMap.copyOf(services);
59     }
60
61
62     @Override
63     public ConsumerSession registerConsumer(final Consumer consumer,
64             final BundleContext ctx) {
65         checkPredicates(consumer);
66         log.trace("Registering consumer {}", consumer);
67         final ConsumerContextImpl session = newSessionFor(consumer, ctx);
68         consumer.onSessionInitiated(session);
69         sessions.add(session);
70         return session;
71     }
72
73     @Override
74     public ProviderSession registerProvider(final Provider provider,
75             final BundleContext ctx) {
76         checkPredicates(provider);
77         final ProviderContextImpl session = newSessionFor(provider, ctx);
78         provider.onSessionInitiated(session);
79         providerSessions.add(session);
80         return session;
81     }
82
83     protected Future<RpcResult<CompositeNode>> invokeRpcAsync(final QName rpc,
84             final CompositeNode input) {
85         return router.invokeRpc(rpc, input);
86     }
87
88     // Validation
89     private void checkPredicates(final Provider prov) {
90         Preconditions.checkNotNull(prov, "Provider should not be null.");
91         for (ProviderContextImpl session : providerSessions) {
92             if (prov.equals(session.getProvider())) {
93                 throw new IllegalStateException("Provider already registered");
94             }
95         }
96
97     }
98
99     private void checkPredicates(final Consumer cons) {
100         Preconditions.checkNotNull(cons, "Consumer should not be null.");
101         for (ConsumerContextImpl session : sessions) {
102             if (cons.equals(session.getConsumer())) {
103                 throw new IllegalStateException("Consumer already registered");
104             }
105         }
106     }
107
108     // Private Factory methods
109     private ConsumerContextImpl newSessionFor(final Consumer provider,
110             final BundleContext ctx) {
111         ConsumerContextImpl ret = new ConsumerContextImpl(provider, this);
112         return ret;
113     }
114
115     private ProviderContextImpl newSessionFor(final Provider provider,
116             final BundleContext ctx) {
117         ProviderContextImpl ret = new ProviderContextImpl(provider, this);
118         return ret;
119     }
120
121     protected void consumerSessionClosed(
122             final ConsumerContextImpl consumerContextImpl) {
123         sessions.remove(consumerContextImpl);
124         providerSessions.remove(consumerContextImpl);
125     }
126
127     @Override
128     public void close() throws Exception {
129         if (deactivator != null) {
130             deactivator.close();
131             deactivator = null;
132         }
133     }
134
135     @Override
136     public RpcRegistration addRpcImplementation(final QName rpcType,
137             final RpcImplementation implementation)
138             throws IllegalArgumentException {
139         return router.addRpcImplementation(rpcType, implementation);
140     }
141
142     @Override
143     public RoutedRpcRegistration addRoutedRpcImplementation(
144             final QName rpcType, final RpcImplementation implementation) {
145         return router.addRoutedRpcImplementation(rpcType, implementation);
146     }
147
148     @Override
149     public void setRoutedRpcDefaultDelegate(
150             final RoutedRpcDefaultImplementation defaultImplementation) {
151         router.setRoutedRpcDefaultDelegate(defaultImplementation);
152     }
153
154     @Override
155     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(
156             final RpcRegistrationListener listener) {
157         return router.addRpcRegistrationListener(listener);
158     }
159
160     @Override
161     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
162             final L listener) {
163         return router.registerRouteChangeListener(listener);
164     }
165
166     @Override
167     public Set<QName> getSupportedRpcs() {
168         return router.getSupportedRpcs();
169     }
170
171     @Override
172     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(
173             final QName rpc, final CompositeNode input) {
174         return router.invokeRpc(rpc, input);
175     }
176
177     /**
178      * @return the deactivator
179      */
180     public AutoCloseable getDeactivator() {
181         return deactivator;
182     }
183
184     /**
185      * @param deactivator
186      *            the deactivator to set
187      */
188     public void setDeactivator(final AutoCloseable deactivator) {
189         this.deactivator = deactivator;
190     }
191
192     /**
193      * @return the router
194      */
195     public RpcRouter getRouter() {
196         return router;
197     }
198
199     /**
200      * @param router
201      *            the router to set
202      */
203     public void setRouter(final RpcRouter router) {
204         this.router = router;
205     }
206
207     protected <T extends BrokerService> Optional<T> getGlobalService(final Class<T> service) {
208         return Optional.fromNullable(services.getInstance(service));
209     }
210
211 }