Merge "Do not rely od DataStore for all connected nodes"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.WeakHashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30
31 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
32 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
33 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
36 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
37 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
40 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
41 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
42 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
43 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
46 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
47 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
48 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
49 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
50 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
51 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
52 import org.opendaylight.controller.sal.common.util.Rpcs;
53 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
54 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
55 import org.opendaylight.controller.sal.core.api.Provider;
56 import org.opendaylight.controller.sal.core.api.RpcImplementation;
57 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
58 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
59 import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
60 import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
61 import org.opendaylight.yangtools.concepts.ListenerRegistration;
62 import org.opendaylight.yangtools.concepts.Registration;
63 import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
64 import org.opendaylight.yangtools.yang.binding.Augmentable;
65 import org.opendaylight.yangtools.yang.binding.Augmentation;
66 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
67 import org.opendaylight.yangtools.yang.binding.BindingMapping;
68 import org.opendaylight.yangtools.yang.binding.DataContainer;
69 import org.opendaylight.yangtools.yang.binding.DataObject;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
71 import org.opendaylight.yangtools.yang.binding.Notification;
72 import org.opendaylight.yangtools.yang.binding.RpcService;
73 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
74 import org.opendaylight.yangtools.yang.common.QName;
75 import org.opendaylight.yangtools.yang.common.RpcError;
76 import org.opendaylight.yangtools.yang.common.RpcResult;
77 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
78 import org.opendaylight.yangtools.yang.data.api.Node;
79 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
80 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
81 import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 import com.google.common.base.Function;
86 import com.google.common.base.Optional;
87 import com.google.common.collect.FluentIterable;
88 import com.google.common.collect.ImmutableList;
89 import com.google.common.collect.ImmutableSet;
90 import com.google.common.collect.ImmutableSet.Builder;
91 import com.google.common.util.concurrent.Futures;
92
93 public class BindingIndependentConnector implements //
94         RuntimeDataProvider, //
95         Provider, //
96         AutoCloseable {
97
98
99
100     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
101
102     @SuppressWarnings("deprecation")
103     private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
104
105     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
106             .builder().toInstance();
107
108     private final static Method EQUALS_METHOD;
109
110     private BindingIndependentMappingService mappingService;
111
112     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
113
114     private DataProviderService baDataService;
115
116     private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
117     private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
118
119     private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
120     private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
121
122     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
123
124     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
125
126     private RpcProvisionRegistry biRpcRegistry;
127     private RpcProviderRegistry baRpcRegistry;
128
129     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
130     // private ListenerRegistration<BindingToDomRpcForwardingManager>
131     // bindingToDomRpcManager;
132
133     private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
134
135         @Override
136         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
137             return mappingService.toDataDom(input);
138         }
139
140     };
141
142     private Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> baDataReaderRegistration;
143
144     private boolean rpcForwarding = false;
145
146     private boolean dataForwarding = false;
147
148     private boolean notificationForwarding = false;
149
150     private RpcProviderRegistryImpl baRpcRegistryImpl;
151
152     private NotificationProviderService baNotifyService;
153
154     private NotificationPublishService domNotificationService;
155
156     static {
157         try {
158             EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
159         } catch (Exception e) {
160             throw new RuntimeException(e);
161         }
162     }
163
164     @Override
165     public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
166         try {
167             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
168             CompositeNode result = biDataService.readOperationalData(biPath);
169             return potentialAugmentationRead(path, biPath, result);
170         } catch (DeserializationException e) {
171             throw new IllegalStateException(e);
172         }
173     }
174
175     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
176             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result)
177             throws DeserializationException {
178         Class<? extends DataObject> targetType = path.getTargetType();
179         if (Augmentation.class.isAssignableFrom(targetType)) {
180             path = mappingService.fromDataDom(biPath);
181             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
182             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
183             if (parentTo instanceof Augmentable<?>) {
184                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
185             }
186         }
187         return mappingService.dataObjectFromDataDom(path, result);
188     }
189
190     @Override
191     public DataObject readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
192         try {
193             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
194             CompositeNode result = biDataService.readConfigurationData(biPath);
195             return potentialAugmentationRead(path, biPath, result);
196         } catch (DeserializationException e) {
197             throw new IllegalStateException(e);
198         }
199     }
200
201     private DataModificationTransaction createBindingToDomTransaction(
202             DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
203         DataModificationTransaction target = biDataService.beginTransaction();
204         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
205         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
206                 .entrySet()) {
207             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
208                     .toDataDom(entry);
209             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
210             LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
211         }
212         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
213                 .entrySet()) {
214             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
215                     .toDataDom(entry);
216             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
217             LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
218         }
219         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
220             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
221             target.removeConfigurationData(biEntry);
222             LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
223         }
224         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
225             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
226             target.removeOperationalData(biEntry);
227             LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
228         }
229         return target;
230     }
231
232     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
233             DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
234         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
235                 .beginTransaction();
236         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
237                 .getUpdatedConfigurationData().entrySet()) {
238             try {
239                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
240                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
241                 target.putConfigurationData(baKey, baData);
242             } catch (DeserializationException e) {
243                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
244             }
245         }
246         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
247                 .getUpdatedOperationalData().entrySet()) {
248             try {
249
250                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
251                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
252                 target.putOperationalData(baKey, baData);
253             } catch (DeserializationException e) {
254                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
255             }
256         }
257         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
258             try {
259
260                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
261                 target.removeConfigurationData(baEntry);
262             } catch (DeserializationException e) {
263                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
264             }
265         }
266         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
267             try {
268
269                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
270                 target.removeOperationalData(baEntry);
271             } catch (DeserializationException e) {
272                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
273             }
274         }
275         return target;
276     }
277
278     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
279         return biDataService;
280     }
281
282     protected void setDomDataService(org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
283         this.biDataService = biDataService;
284     }
285
286     public DataProviderService getBaDataService() {
287         return baDataService;
288     }
289
290     protected void setBindingDataService(DataProviderService baDataService) {
291         this.baDataService = baDataService;
292     }
293
294     public RpcProviderRegistry getRpcRegistry() {
295         return baRpcRegistry;
296     }
297
298     protected void setBindingRpcRegistry(RpcProviderRegistry rpcRegistry) {
299         this.baRpcRegistry = rpcRegistry;
300     }
301
302     public void startDataForwarding() {
303         checkState(!dataForwarding, "Connector is already forwarding data.");
304         baDataReaderRegistration = baDataService.registerDataReader(ROOT, this);
305         baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler);
306         biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
307         baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
308         dataForwarding = true;
309     }
310
311     public void startRpcForwarding() {
312         if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
313             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
314             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
315             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
316                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
317                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
318                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
319             }
320             rpcForwarding = true;
321         }
322     }
323
324     public void startNotificationForwarding() {
325         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
326         if (baNotifyService != null && domNotificationService != null) {
327             baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
328
329             notificationForwarding = true;
330         }
331     }
332
333     protected void setMappingService(BindingIndependentMappingService mappingService) {
334         this.mappingService = mappingService;
335     }
336
337     @Override
338     public Collection<ProviderFunctionality> getProviderFunctionality() {
339         return Collections.emptyList();
340     }
341
342     @Override
343     public void onSessionInitiated(ProviderSession session) {
344         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
345         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
346
347     }
348
349     public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
350
351     }
352
353     public void setDomRpcRegistry(RpcProvisionRegistry registry) {
354         biRpcRegistry = registry;
355     }
356
357     @Override
358     public void close() throws Exception {
359         if (baCommitHandlerRegistration != null) {
360             baCommitHandlerRegistration.close();
361         }
362         if (biCommitHandlerRegistration != null) {
363             biCommitHandlerRegistration.close();
364         }
365
366     }
367
368     private class DomToBindingTransaction implements
369             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
370
371         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
372         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
373
374         public DomToBindingTransaction(
375                 org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
376                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
377             super();
378             this.backing = backing;
379             this.modification = modification;
380             bindingOpenedTransactions.put(backing.getIdentifier(), this);
381         }
382
383         @Override
384         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
385             return modification;
386         }
387
388         @Override
389         public RpcResult<Void> rollback() throws IllegalStateException {
390             // backing.cancel();
391             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
392         }
393
394         @Override
395         public RpcResult<Void> finish() throws IllegalStateException {
396             Future<RpcResult<TransactionStatus>> result = backing.commit();
397             try {
398                 RpcResult<TransactionStatus> baResult = result.get();
399                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
400             } catch (InterruptedException e) {
401                 throw new IllegalStateException("", e);
402             } catch (ExecutionException e) {
403                 throw new IllegalStateException("", e);
404             }
405         }
406     }
407
408     private class BindingToDomTransaction implements
409             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
410
411         private final DataModificationTransaction backing;
412         private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
413
414         public BindingToDomTransaction(DataModificationTransaction backing,
415                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
416             this.backing = backing;
417             this.modification = modification;
418             domOpenedTransactions.put(backing.getIdentifier(), this);
419         }
420
421         @Override
422         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
423             return modification;
424         }
425
426         @Override
427         public RpcResult<Void> finish() throws IllegalStateException {
428             Future<RpcResult<TransactionStatus>> result = backing.commit();
429             try {
430                 RpcResult<TransactionStatus> biResult = result.get();
431                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
432             } catch (InterruptedException e) {
433                 throw new IllegalStateException("", e);
434             } catch (ExecutionException e) {
435                 throw new IllegalStateException("", e);
436             } finally {
437                 domOpenedTransactions.remove(backing.getIdentifier());
438             }
439         }
440
441         @Override
442         public RpcResult<Void> rollback() throws IllegalStateException {
443             domOpenedTransactions.remove(backing.getIdentifier());
444             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
445         }
446     }
447
448     private class BindingToDomCommitHandler implements
449             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
450
451         @Override
452         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
453                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
454
455             /**
456              * Transaction was created as DOM transaction, in that case we do
457              * not need to forward it back.
458              */
459             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
460
461                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
462             }
463             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
464             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
465             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
466                     domTransaction.getIdentifier());
467             return wrapped;
468         }
469     }
470
471     private class DomToBindingCommitHandler implements //
472             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
473             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
474
475         @Override
476         public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
477
478             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
479                     .getPath());
480
481         }
482
483         @Override
484         public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
485             // NOOP for now
486             // FIXME: do registration based on only active commit handlers.
487         }
488
489         @Override
490         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
491                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
492             Object identifier = domTransaction.getIdentifier();
493
494             /**
495              * We checks if the transcation was originated in this mapper. If it
496              * was originated in this mapper we are returing allways success
497              * commit hanlder to prevent creating loop in two-phase commit and
498              * duplicating data.
499              */
500             if (domOpenedTransactions.containsKey(identifier)) {
501                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
502             }
503
504             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
505             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
506             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
507                     baTransaction.getIdentifier());
508             return forwardedTransaction;
509         }
510     }
511
512     /**
513      * Manager responsible for instantiating forwarders responsible for
514      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
515      *
516      */
517     private class DomToBindingRpcForwardingManager implements
518             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
519             RouterInstantiationListener,
520             GlobalRpcRegistrationListener {
521
522         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
523         private RpcProviderRegistryImpl registryImpl;
524
525         public RpcProviderRegistryImpl getRegistryImpl() {
526             return registryImpl;
527         }
528
529         public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
530             this.registryImpl = registryImpl;
531         }
532
533         @Override
534         public void onGlobalRpcRegistered(Class<? extends RpcService> cls) {
535             getRpcForwarder(cls, null);
536         }
537
538         @Override
539         public void onGlobalRpcUnregistered(Class<? extends RpcService> cls) {
540             // NOOP
541         }
542
543         @Override
544         public void onRpcRouterCreated(RpcRouter<?> router) {
545             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
546             getRpcForwarder(router.getServiceType(), ctx);
547         }
548
549         @Override
550         public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
551             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
552                 bindingRoutesAdded(entry);
553             }
554         }
555
556         private void bindingRoutesAdded(Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
557             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
558             Class<? extends RpcService> service = entry.getKey().getRpcService();
559             if (context != null) {
560                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
561             }
562         }
563
564         private DomToBindingRpcForwarder getRpcForwarder(Class<? extends RpcService> service,
565                 Class<? extends BaseIdentity> context) {
566             DomToBindingRpcForwarder potential = forwarders.get(service);
567             if (potential != null) {
568                 return potential;
569             }
570             if (context == null) {
571                 potential = new DomToBindingRpcForwarder(service);
572             } else {
573                 potential = new DomToBindingRpcForwarder(service, context);
574             }
575
576             forwarders.put(service, potential);
577             return potential;
578         }
579
580     }
581
582     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
583
584         private final Set<QName> supportedRpcs;
585         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
586         private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
587         private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
588         private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
589
590         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
591             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
592             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
593             try {
594                 for (QName rpc : supportedRpcs) {
595                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
596                     strategiesByMethod.put(strategy.targetMethod, strategy);
597                     strategiesByQName.put(rpc, strategy);
598                     biRpcRegistry.addRpcImplementation(rpc, this);
599                 }
600
601             } catch (Exception e) {
602                 LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
603             }
604             registrations = ImmutableSet.of();
605         }
606
607         /**
608          * Constructor for Routed RPC Forwareder.
609          *
610          * @param service
611          * @param context
612          */
613         public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
614             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
615             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
616             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
617                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
618             try {
619                 for (QName rpc : supportedRpcs) {
620                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
621                     strategiesByMethod.put(strategy.targetMethod, strategy);
622                     strategiesByQName.put(rpc, strategy);
623                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
624                 }
625                 createDefaultDomForwarder();
626             } catch (Exception e) {
627                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
628             }
629             registrations = registrationsBuilder.build();
630         }
631
632         public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
633                 Set<InstanceIdentifier<?>> set) {
634             QName ctx = BindingReflections.findQName(context);
635             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
636                     toDOMInstanceIdentifier)) {
637                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
638                     reg.registerPath(ctx, path);
639                 }
640             }
641         }
642
643
644         @Override
645         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
646             if (EQUALS_METHOD.equals(method)) {
647                 return false;
648             }
649             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
650             checkState(strategy != null);
651             checkArgument(args.length <= 2);
652             if (args.length == 1) {
653                 checkArgument(args[0] instanceof DataObject);
654                 return strategy.forwardToDomBroker((DataObject) args[0]);
655             }
656             return strategy.forwardToDomBroker(null);
657         }
658
659         public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
660                 Set<InstanceIdentifier<?>> set) {
661             QName ctx = BindingReflections.findQName(context);
662             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
663                     toDOMInstanceIdentifier)) {
664                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
665                     reg.unregisterPath(ctx, path);
666                 }
667             }
668         }
669
670         @Override
671         public Set<QName> getSupportedRpcs() {
672             return supportedRpcs;
673         }
674
675         @SuppressWarnings({ "unchecked", "rawtypes" })
676         public void createDefaultDomForwarder() {
677             if (baRpcRegistryImpl != null) {
678                 Class<?> cls = rpcServiceType.get();
679                 ClassLoader clsLoader = cls.getClassLoader();
680                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
681
682                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
683                 rpcRouter.registerDefaultService(proxy);
684             }
685         }
686
687         @Override
688         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
689             checkArgument(rpc != null);
690             checkArgument(domInput != null);
691
692             Class<? extends RpcService> rpcType = rpcServiceType.get();
693             checkState(rpcType != null);
694             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
695             checkState(rpcService != null);
696             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
697             try {
698                 return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
699             } catch (Exception e) {
700                 throw new IllegalStateException(e);
701             }
702         }
703
704         private RpcInvocationStrategy resolveInvocationStrategy(QName rpc) {
705             return strategiesByQName.get(rpc);
706         }
707
708         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
709                 final Class<? extends RpcService> rpcType) throws Exception {
710             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
711                 @Override
712                 public RpcInvocationStrategy call() throws Exception {
713                     String methodName = BindingMapping.getMethodName(rpc);
714                     Method targetMethod = null;
715                     for (Method possibleMethod : rpcType.getMethods()) {
716                         if (possibleMethod.getName().equals(methodName)
717                                 && BindingReflections.isRpcMethod(possibleMethod)) {
718                             targetMethod = possibleMethod;
719                             break;
720                         }
721                     }
722                     checkState(targetMethod != null, "Rpc method not found");
723                     Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
724                     Optional<Class<? extends DataContainer>> inputClass = BindingReflections
725                             .resolveRpcInputClass(targetMethod);
726
727                     RpcInvocationStrategy strategy = null;
728                     if (outputClass.isPresent()) {
729                         if (inputClass.isPresent()) {
730                             strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
731                                     .get());
732                         } else {
733                             strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
734                         }
735                     } else if(inputClass.isPresent()){
736                         strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
737                     } else {
738                         strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
739                     }
740                     return strategy;
741                 }
742
743             });
744         }
745     }
746
747     private abstract class RpcInvocationStrategy {
748
749         protected final Method targetMethod;
750         protected final QName rpc;
751
752         public RpcInvocationStrategy(QName rpc, Method targetMethod) {
753             this.targetMethod = targetMethod;
754             this.rpc = rpc;
755         }
756
757         public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
758
759         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
760                 throws Exception;
761
762         public RpcResult<CompositeNode> invokeOn(RpcService rpcService, CompositeNode domInput) throws Exception {
763             return uncheckedInvoke(rpcService, domInput);
764         }
765     }
766
767     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
768
769         @SuppressWarnings("rawtypes")
770         private final WeakReference<Class> inputClass;
771
772         @SuppressWarnings("rawtypes")
773         private final WeakReference<Class> outputClass;
774
775         @SuppressWarnings({ "rawtypes", "unchecked" })
776         public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
777                 Class<? extends DataContainer> inputClass) {
778             super(rpc, targetMethod);
779             this.outputClass = new WeakReference(outputClass);
780             this.inputClass = new WeakReference(inputClass);
781         }
782
783         @SuppressWarnings("unchecked")
784         @Override
785         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
786             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
787             Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
788             if (futureResult == null) {
789                 return Rpcs.getRpcResult(false);
790             }
791             RpcResult<?> bindingResult = futureResult.get();
792             final Object resultObj = bindingResult.getResult();
793             if (resultObj instanceof DataObject) {
794                 final CompositeNode output = mappingService.toDataDom((DataObject)resultObj);
795                 return Rpcs.getRpcResult(true, output, Collections.<RpcError>emptySet());
796             }
797             return Rpcs.getRpcResult(true);
798         }
799
800         @Override
801         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
802             if(biRpcRegistry != null) {
803                 CompositeNode xml = mappingService.toDataDom(input);
804                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
805                 RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
806                 Object baResultValue = null;
807                 if (result.getResult() != null) {
808                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
809                 }
810                 RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
811                 return Futures.<RpcResult<?>> immediateFuture(baResult);
812             }
813             return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
814         }
815
816     }
817
818     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
819
820         public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
821             super(rpc, targetMethod);
822         }
823
824         @Override
825         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
826             @SuppressWarnings("unchecked")
827             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
828             RpcResult<Void> bindingResult = result.get();
829             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
830         }
831
832         @Override
833         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
834             return Futures.immediateFuture(null);
835         }
836     }
837
838     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
839
840
841         @SuppressWarnings("rawtypes")
842         private final WeakReference<Class> inputClass;
843
844         @SuppressWarnings({ "rawtypes", "unchecked" })
845         public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
846                 Class<? extends DataContainer> inputClass) {
847             super(rpc,targetMethod);
848             this.inputClass = new WeakReference(inputClass);
849         }
850
851
852         @Override
853         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
854             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
855             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
856             if (result == null) {
857                 return Rpcs.getRpcResult(false);
858             }
859             RpcResult<?> bindingResult = result.get();
860             return Rpcs.getRpcResult(true);
861         }
862
863         @Override
864         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
865             if(biRpcRegistry != null) {
866                 CompositeNode xml = mappingService.toDataDom(input);
867                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
868                 RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
869                 Object baResultValue = null;
870                 RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
871                 return Futures.<RpcResult<?>>immediateFuture(baResult);
872             }
873             return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
874         }
875
876     }
877
878     public boolean isRpcForwarding() {
879         return rpcForwarding;
880     }
881
882     public boolean isDataForwarding() {
883         return dataForwarding;
884     }
885
886     public boolean isNotificationForwarding() {
887         return notificationForwarding;
888     }
889
890     public BindingIndependentMappingService getMappingService() {
891         return mappingService;
892     }
893
894     public void setBindingNotificationService(NotificationProviderService baService) {
895         this.baNotifyService = baService;
896
897     }
898
899     public void setDomNotificationService(NotificationPublishService domService) {
900         this.domNotificationService = domService;
901     }
902
903     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
904
905         private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
906         private final Set<QName> supportedNotifications = new HashSet<>();
907
908         @Override
909         public Set<QName> getSupportedNotifications() {
910             return Collections.unmodifiableSet(supportedNotifications);
911         }
912
913         @Override
914         public void onNotification(CompositeNode notification) {
915             QName qname = notification.getNodeType();
916             WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
917             if (potential != null) {
918                 Class<? extends Notification> potentialClass = potential.get();
919                 if (potentialClass != null) {
920                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
921                             notification);
922
923                     if (baNotification instanceof Notification) {
924                         baNotifyService.publish((Notification) baNotification);
925                     }
926                 }
927             }
928         }
929
930         @Override
931         public void onNotificationSubscribtion(Class<? extends Notification> notificationType) {
932             QName qname = BindingReflections.findQName(notificationType);
933             if (qname != null) {
934                 WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
935                         new WeakReference<Class<? extends Notification>>(notificationType));
936                 if (already == null) {
937                     domNotificationService.addNotificationListener(qname, this);
938                     supportedNotifications.add(qname);
939                 }
940             }
941         }
942     }
943 }