Merge "Make configuration push timeout configurable"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Set;
23 import java.util.WeakHashMap;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29
30 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
31 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
32 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
33 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
35 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
36 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
37 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
40 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
41 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
42 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
43 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
44 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
45 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
46 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
47 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
48 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
49 import org.opendaylight.controller.sal.common.util.Rpcs;
50 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
51 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
52 import org.opendaylight.controller.sal.core.api.Provider;
53 import org.opendaylight.controller.sal.core.api.RpcImplementation;
54 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
55 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.concepts.Registration;
58 import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
59 import org.opendaylight.yangtools.yang.binding.Augmentable;
60 import org.opendaylight.yangtools.yang.binding.Augmentation;
61 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
62 import org.opendaylight.yangtools.yang.binding.BindingMapping;
63 import org.opendaylight.yangtools.yang.binding.DataContainer;
64 import org.opendaylight.yangtools.yang.binding.DataObject;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.binding.RpcService;
67 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
68 import org.opendaylight.yangtools.yang.common.QName;
69 import org.opendaylight.yangtools.yang.common.RpcError;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
72 import org.opendaylight.yangtools.yang.data.api.Node;
73 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import com.google.common.base.Function;
78 import com.google.common.base.Optional;
79 import com.google.common.collect.FluentIterable;
80 import com.google.common.collect.ImmutableList;
81 import com.google.common.collect.ImmutableSet;
82 import com.google.common.collect.ImmutableSet.Builder;
83 import com.google.common.util.concurrent.Futures;
84
85 public class BindingIndependentConnector implements //
86         RuntimeDataProvider, //
87         Provider, //
88         AutoCloseable {
89
90     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
91
92     @SuppressWarnings( "deprecation")
93     private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
94
95     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
96             .builder().toInstance();
97
98     private final static Method EQUALS_METHOD;
99     
100     
101     private BindingIndependentMappingService mappingService;
102
103     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
104
105     private DataProviderService baDataService;
106
107     private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
108     private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
109
110     private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
111     private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
112
113     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
114
115     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
116
117     private RpcProvisionRegistry biRpcRegistry;
118     private RpcProviderRegistry baRpcRegistry;
119
120     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
121     // private ListenerRegistration<BindingToDomRpcForwardingManager>
122     // bindingToDomRpcManager;
123
124     private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
125
126         @Override
127         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
128             return mappingService.toDataDom(input);
129         }
130
131     };
132
133     private Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> baDataReaderRegistration;
134
135     private boolean rpcForwarding = false;
136
137     private boolean dataForwarding = false;
138
139     private boolean notificationForwarding = false;
140
141     private RpcProviderRegistryImpl baRpcRegistryImpl;
142
143     private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
144     
145     
146     static {
147         try {
148         EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
149         } catch (Exception e) {
150             throw new RuntimeException(e);
151         }
152     }
153
154     @Override
155     public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
156         try {
157             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
158             CompositeNode result = biDataService.readOperationalData(biPath);
159             return potentialAugmentationRead(path, biPath, result);
160         } catch (DeserializationException e) {
161             throw new IllegalStateException(e);
162         }
163     }
164
165     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
166             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result)
167             throws DeserializationException {
168         Class<? extends DataObject> targetType = path.getTargetType();
169         if (Augmentation.class.isAssignableFrom(targetType)) {
170             path = mappingService.fromDataDom(biPath);
171             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
172             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
173             if (parentTo instanceof Augmentable<?>) {
174                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
175             }
176         }
177         return mappingService.dataObjectFromDataDom(path, result);
178     }
179
180     @Override
181     public DataObject readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
182         try {
183             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
184             CompositeNode result = biDataService.readConfigurationData(biPath);
185             return potentialAugmentationRead(path, biPath, result);
186         } catch (DeserializationException e) {
187             throw new IllegalStateException(e);
188         }
189     }
190
191     private DataModificationTransaction createBindingToDomTransaction(
192             DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
193         DataModificationTransaction target = biDataService.beginTransaction();
194         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
195         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
196                 .entrySet()) {
197             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
198                     .toDataDom(entry);
199             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
200             LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
201         }
202         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
203                 .entrySet()) {
204             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
205                     .toDataDom(entry);
206             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
207             LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
208         }
209         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
210             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
211             target.removeConfigurationData(biEntry);
212             LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
213         }
214         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
215             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
216             target.removeOperationalData(biEntry);
217             LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
218         }
219         return target;
220     }
221
222     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
223             DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
224         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
225                 .beginTransaction();
226         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
227                 .getUpdatedConfigurationData().entrySet()) {
228             try {
229                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
230                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
231                 target.putConfigurationData(baKey, baData);
232             } catch (DeserializationException e) {
233                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
234             }
235         }
236         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
237                 .getUpdatedOperationalData().entrySet()) {
238             try {
239
240                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
241                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
242                 target.putOperationalData(baKey, baData);
243             } catch (DeserializationException e) {
244                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
245             }
246         }
247         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
248             try {
249
250                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
251                 target.removeConfigurationData(baEntry);
252             } catch (DeserializationException e) {
253                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
254             }
255         }
256         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
257             try {
258
259                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
260                 target.removeOperationalData(baEntry);
261             } catch (DeserializationException e) {
262                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
263             }
264         }
265         return target;
266     }
267
268     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
269         return biDataService;
270     }
271
272     protected void setDomDataService(org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
273         this.biDataService = biDataService;
274     }
275
276     public DataProviderService getBaDataService() {
277         return baDataService;
278     }
279
280     protected void setBindingDataService(DataProviderService baDataService) {
281         this.baDataService = baDataService;
282     }
283
284     public RpcProviderRegistry getRpcRegistry() {
285         return baRpcRegistry;
286     }
287
288     protected void setBindingRpcRegistry(RpcProviderRegistry rpcRegistry) {
289         this.baRpcRegistry = rpcRegistry;
290     }
291
292     public void startDataForwarding() {
293         checkState(!dataForwarding, "Connector is already forwarding data.");
294         baDataReaderRegistration = baDataService.registerDataReader(ROOT, this);
295         baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler);
296         biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
297         baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
298         dataForwarding = true;
299     }
300
301     public void startRpcForwarding() {
302         if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
303             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
304             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
305             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
306                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
307                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
308                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
309             }
310             if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
311                 biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
312             }
313             rpcForwarding = true;
314         }
315     }
316
317     public void startNotificationForwarding() {
318         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
319         notificationForwarding = true;
320     }
321
322     protected void setMappingService(BindingIndependentMappingService mappingService) {
323         this.mappingService = mappingService;
324     }
325
326     @Override
327     public Collection<ProviderFunctionality> getProviderFunctionality() {
328         return Collections.emptyList();
329     }
330
331     @Override
332     public void onSessionInitiated(ProviderSession session) {
333         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
334         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
335
336     }
337
338     public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
339
340     }
341
342     public void setDomRpcRegistry(RpcProvisionRegistry registry) {
343         biRpcRegistry = registry;
344     }
345
346     @Override
347     public void close() throws Exception {
348         if (baCommitHandlerRegistration != null) {
349             baCommitHandlerRegistration.close();
350         }
351         if (biCommitHandlerRegistration != null) {
352             biCommitHandlerRegistration.close();
353         }
354
355     }
356
357     private class DomToBindingTransaction implements
358             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
359
360         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
361         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
362
363         public DomToBindingTransaction(
364                 org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
365                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
366             super();
367             this.backing = backing;
368             this.modification = modification;
369             bindingOpenedTransactions.put(backing.getIdentifier(), this);
370         }
371
372         @Override
373         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
374             return modification;
375         }
376
377         @Override
378         public RpcResult<Void> rollback() throws IllegalStateException {
379             // backing.cancel();
380             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
381         }
382
383         @Override
384         public RpcResult<Void> finish() throws IllegalStateException {
385             Future<RpcResult<TransactionStatus>> result = backing.commit();
386             try {
387                 RpcResult<TransactionStatus> baResult = result.get();
388                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
389             } catch (InterruptedException e) {
390                 throw new IllegalStateException("", e);
391             } catch (ExecutionException e) {
392                 throw new IllegalStateException("", e);
393             }
394         }
395     }
396
397     private class BindingToDomTransaction implements
398             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
399
400         private DataModificationTransaction backing;
401         private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
402
403         public BindingToDomTransaction(DataModificationTransaction backing,
404                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
405             this.backing = backing;
406             this.modification = modification;
407             domOpenedTransactions.put(backing.getIdentifier(), this);
408         }
409
410         @Override
411         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
412             return modification;
413         }
414
415         @Override
416         public RpcResult<Void> finish() throws IllegalStateException {
417             Future<RpcResult<TransactionStatus>> result = backing.commit();
418             try {
419                 RpcResult<TransactionStatus> biResult = result.get();
420                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
421             } catch (InterruptedException e) {
422                 throw new IllegalStateException("", e);
423             } catch (ExecutionException e) {
424                 throw new IllegalStateException("", e);
425             } finally {
426                 domOpenedTransactions.remove(backing.getIdentifier());
427             }
428         }
429
430         @Override
431         public RpcResult<Void> rollback() throws IllegalStateException {
432             domOpenedTransactions.remove(backing.getIdentifier());
433             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
434         }
435     }
436
437     private class BindingToDomCommitHandler implements
438             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
439
440         @Override
441         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
442                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
443
444             /**
445              * Transaction was created as DOM transaction, in that case we do
446              * not need to forward it back.
447              */
448             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
449
450                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
451             }
452             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
453             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
454             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
455                     domTransaction.getIdentifier());
456             return wrapped;
457         }
458     }
459
460     private class DomToBindingCommitHandler implements //
461             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject>>, //
462             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
463
464         @Override
465         public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
466
467             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
468                     .getPath());
469
470         }
471
472         @Override
473         public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
474             // NOOP for now
475             // FIXME: do registration based on only active commit handlers.
476         }
477
478         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
479                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
480             Object identifier = domTransaction.getIdentifier();
481
482             /**
483              * We checks if the transcation was originated in this mapper. If it
484              * was originated in this mapper we are returing allways success
485              * commit hanlder to prevent creating loop in two-phase commit and
486              * duplicating data.
487              */
488             if (domOpenedTransactions.containsKey(identifier)) {
489                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
490             }
491
492             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
493             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
494             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
495                     baTransaction.getIdentifier());
496             return forwardedTransaction;
497         }
498     }
499
500     /**
501      * Manager responsible for instantiating forwarders responsible for
502      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
503      * 
504      */
505     private class DomToBindingRpcForwardingManager implements
506             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
507             RouterInstantiationListener,
508             GlobalRpcRegistrationListener {
509
510         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
511         private RpcProviderRegistryImpl registryImpl;
512
513         public RpcProviderRegistryImpl getRegistryImpl() {
514             return registryImpl;
515         }
516
517         public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
518             this.registryImpl = registryImpl;
519         }
520         
521         @Override
522         public void onGlobalRpcRegistered(Class<? extends RpcService> cls) {
523             getRpcForwarder(cls, null);
524         }
525         
526         @Override
527         public void onGlobalRpcUnregistered(Class<? extends RpcService> cls) {
528             // NOOP
529         }
530         
531         @Override
532         public void onRpcRouterCreated(RpcRouter<?> router) {
533             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
534             getRpcForwarder(router.getServiceType(), ctx);
535         }
536
537         @Override
538         public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
539             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
540                 bindingRoutesAdded(entry);
541             }
542         }
543
544         private void bindingRoutesAdded(Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
545             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
546             Class<? extends RpcService> service = entry.getKey().getRpcService();
547             if (context != null) {
548                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
549             }
550         }
551
552         private DomToBindingRpcForwarder getRpcForwarder(Class<? extends RpcService> service,
553                 Class<? extends BaseIdentity> context) {
554             DomToBindingRpcForwarder potential = forwarders.get(service);
555             if (potential != null) {
556                 return potential;
557             }
558             if (context == null) {
559                 potential = new DomToBindingRpcForwarder(service);
560             } else {
561                 potential = new DomToBindingRpcForwarder(service, context);
562             }
563
564             forwarders.put(service, potential);
565             return potential;
566         }
567
568     }
569
570     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
571
572         private final Set<QName> supportedRpcs;
573         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
574         private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
575         private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
576         private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
577
578         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
579             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
580             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
581             try {
582                 for (QName rpc : supportedRpcs) {
583                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
584                     strategiesByMethod.put(strategy.targetMethod, strategy);
585                     strategiesByQName.put(rpc, strategy);
586                     biRpcRegistry.addRpcImplementation(rpc, this);
587                 }
588
589             } catch (Exception e) {
590                 LOG.error("Could not forward Rpcs of type {}", service.getName());
591             }
592             registrations = ImmutableSet.of();
593         }
594
595         /**
596          * Constructor for Routed RPC Forwareder.
597          * 
598          * @param service
599          * @param context
600          */
601         public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
602             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
603             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
604             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
605                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
606             try {
607                 for (QName rpc : supportedRpcs) {
608                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
609                     strategiesByMethod.put(strategy.targetMethod, strategy);
610                     strategiesByQName.put(rpc, strategy);
611                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
612                 }
613                 createDefaultDomForwarder();
614             } catch (Exception e) {
615                 LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
616             }
617             registrations = registrationsBuilder.build();
618         }
619
620         public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
621                 Set<InstanceIdentifier<?>> set) {
622             QName ctx = BindingReflections.findQName(context);
623             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
624                     toDOMInstanceIdentifier)) {
625                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
626                     reg.registerPath(ctx, path);
627                 }
628             }
629         }
630
631         
632         @Override
633         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
634             if(EQUALS_METHOD.equals(method)) {
635                 return false;
636             }
637             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
638             checkState(strategy != null);
639             checkArgument(args.length <= 2);
640             if(args.length == 1) {
641                 checkArgument(args[0] instanceof DataObject);
642                 return strategy.forwardToDomBroker((DataObject) args[0]);
643             }
644             return strategy.forwardToDomBroker(null);
645         }
646
647         public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
648                 Set<InstanceIdentifier<?>> set) {
649             QName ctx = BindingReflections.findQName(context);
650             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
651                     toDOMInstanceIdentifier)) {
652                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
653                     reg.unregisterPath(ctx, path);
654                 }
655             }
656         }
657
658         @Override
659         public Set<QName> getSupportedRpcs() {
660             return supportedRpcs;
661         }
662
663         @SuppressWarnings({ "unchecked", "rawtypes" })
664         public void createDefaultDomForwarder() {
665             if (baRpcRegistryImpl != null) {
666                 Class<?> cls = rpcServiceType.get();
667                 ClassLoader clsLoader = cls.getClassLoader();
668                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
669                 
670                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
671                 rpcRouter.registerDefaultService(proxy);
672             }
673         }
674
675         @Override
676         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
677             checkArgument(rpc != null);
678             checkArgument(domInput != null);
679
680             Class<? extends RpcService> rpcType = rpcServiceType.get();
681             checkState(rpcType != null);
682             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
683             checkState(rpcService != null);
684             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
685             try {
686                 return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
687             } catch (Exception e) {
688                 throw new IllegalStateException(e);
689             }
690         }
691
692         private RpcInvocationStrategy resolveInvocationStrategy(QName rpc) {
693             return strategiesByQName.get(rpc);
694         }
695
696         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
697                 final Class<? extends RpcService> rpcType) throws Exception {
698             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
699                 @Override
700                 public RpcInvocationStrategy call() throws Exception {
701                     String methodName = BindingMapping.getMethodName(rpc);
702                     Method targetMethod = null;
703                     for (Method possibleMethod : rpcType.getMethods()) {
704                         if (possibleMethod.getName().equals(methodName)
705                                 && BindingReflections.isRpcMethod(possibleMethod)) {
706                             targetMethod = possibleMethod;
707                             break;
708                         }
709                     }
710                     checkState(targetMethod != null, "Rpc method not found");
711                     Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
712                     Optional<Class<? extends DataContainer>> inputClass = BindingReflections
713                             .resolveRpcInputClass(targetMethod);
714
715                     RpcInvocationStrategy strategy = null;
716                     if (outputClass.isPresent()) {
717                         if (inputClass.isPresent()) {
718                             strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
719                         } else {
720                             strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
721                         }
722                     } else {
723                         strategy = null;
724                     }
725                     return strategy;
726                 }
727
728             });
729         }
730     }
731
732     private abstract class RpcInvocationStrategy {
733
734         protected final Method targetMethod;
735         protected final QName rpc;
736
737         public RpcInvocationStrategy(QName rpc,Method targetMethod) {
738             this.targetMethod = targetMethod;
739             this.rpc = rpc;
740         }
741
742         public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
743
744         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
745                 throws Exception;
746
747         public RpcResult<CompositeNode> invokeOn(RpcService rpcService, CompositeNode domInput) throws Exception {
748             return uncheckedInvoke(rpcService, domInput);
749         }
750     }
751
752     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
753
754         @SuppressWarnings("rawtypes")
755         private WeakReference<Class> inputClass;
756
757         @SuppressWarnings("rawtypes")
758         private WeakReference<Class> outputClass;
759
760         @SuppressWarnings({ "rawtypes", "unchecked" })
761         public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
762                 Class<? extends DataContainer> inputClass) {
763             super(rpc,targetMethod);
764             this.outputClass = new WeakReference(outputClass);
765             this.inputClass = new WeakReference(inputClass);
766         }
767
768         @Override
769         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
770             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
771             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
772             if (result == null) {
773                 return Rpcs.getRpcResult(false);
774             }
775             RpcResult<?> bindingResult = result.get();
776             return Rpcs.getRpcResult(true);
777         }
778         
779         @Override
780         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
781             if(biRouter != null) { 
782                 CompositeNode xml = mappingService.toDataDom(input);
783                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
784                 RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
785                 Object baResultValue = null;
786                 if(result.getResult() != null) {
787                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
788                 }
789                 RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
790                 return Futures.<RpcResult<?>>immediateFuture(baResult);
791             }
792             return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
793         }
794
795     }
796
797     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
798
799         public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
800             super(rpc,targetMethod);
801         }
802
803         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
804             @SuppressWarnings("unchecked")
805             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
806             RpcResult<Void> bindingResult = result.get();
807             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
808         }
809         
810         @Override
811         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
812             return Futures.immediateFuture(null);
813         }
814     }
815
816     public boolean isRpcForwarding() {
817         return rpcForwarding;
818     }
819
820     public boolean isDataForwarding() {
821         return dataForwarding;
822     }
823
824     public boolean isNotificationForwarding() {
825         // TODO Auto-generated method stub
826         return notificationForwarding;
827     }
828
829     public BindingIndependentMappingService getMappingService() {
830         return mappingService;
831     }
832 }