Merge "Add lispflowmapping specific configuration options"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.WeakHashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30
31 import org.opendaylight.controller.md.sal.binding.impl.AbstractForwardedDataBroker;
32 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
33 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
40 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
41 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
42 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
43 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
46 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
47 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
48 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
49 import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl;
50 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
51 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
52 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
53 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
54 import org.opendaylight.controller.sal.common.util.Rpcs;
55 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
56 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
57 import org.opendaylight.controller.sal.core.api.Provider;
58 import org.opendaylight.controller.sal.core.api.RpcImplementation;
59 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
60 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
61 import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
62 import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.concepts.Registration;
65 import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
66 import org.opendaylight.yangtools.yang.binding.Augmentable;
67 import org.opendaylight.yangtools.yang.binding.Augmentation;
68 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
69 import org.opendaylight.yangtools.yang.binding.BindingMapping;
70 import org.opendaylight.yangtools.yang.binding.DataContainer;
71 import org.opendaylight.yangtools.yang.binding.DataObject;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.binding.Notification;
74 import org.opendaylight.yangtools.yang.binding.RpcService;
75 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
76 import org.opendaylight.yangtools.yang.common.QName;
77 import org.opendaylight.yangtools.yang.common.RpcError;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
80 import org.opendaylight.yangtools.yang.data.api.Node;
81 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
82 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
83 import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 import com.google.common.base.Function;
88 import com.google.common.base.Optional;
89 import com.google.common.collect.FluentIterable;
90 import com.google.common.collect.ImmutableList;
91 import com.google.common.collect.ImmutableSet;
92 import com.google.common.collect.ImmutableSet.Builder;
93 import com.google.common.util.concurrent.Futures;
94 import com.google.common.util.concurrent.ListenableFuture;
95
96 public class BindingIndependentConnector implements //
97         RuntimeDataProvider, //
98         Provider, //
99         AutoCloseable {
100
101     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
102
103     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
104             .builder().toInstance();
105
106     private final static Method EQUALS_METHOD;
107
108     private BindingIndependentMappingService mappingService;
109
110     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
111
112     private DataProviderService baDataService;
113
114     private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
115     private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
116
117     private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
118     private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
119
120     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
121
122     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
123
124     private RpcProvisionRegistry biRpcRegistry;
125     private RpcProviderRegistry baRpcRegistry;
126
127     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
128     // private ListenerRegistration<BindingToDomRpcForwardingManager>
129     // bindingToDomRpcManager;
130
131     private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
132
133         @Override
134         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier<?> input) {
135             return mappingService.toDataDom(input);
136         }
137
138     };
139
140     private boolean rpcForwarding = false;
141
142     private boolean dataForwarding = false;
143
144     private boolean notificationForwarding = false;
145
146     private RpcProviderRegistryImpl baRpcRegistryImpl;
147
148     private NotificationProviderService baNotifyService;
149
150     private NotificationPublishService domNotificationService;
151
152     static {
153         try {
154             EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
155         } catch (Exception e) {
156             throw new RuntimeException(e);
157         }
158     }
159
160     @Override
161     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
162         try {
163             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
164             CompositeNode result = biDataService.readOperationalData(biPath);
165             return potentialAugmentationRead(path, biPath, result);
166         } catch (DeserializationException e) {
167             throw new IllegalStateException(e);
168         }
169     }
170
171     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
172             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, final CompositeNode result)
173             throws DeserializationException {
174         Class<? extends DataObject> targetType = path.getTargetType();
175         if (Augmentation.class.isAssignableFrom(targetType)) {
176             path = mappingService.fromDataDom(biPath);
177             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
178             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
179             if (parentTo instanceof Augmentable<?>) {
180                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
181             }
182         }
183         return mappingService.dataObjectFromDataDom(path, result);
184     }
185
186     @Override
187     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
188         try {
189             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
190             CompositeNode result = biDataService.readConfigurationData(biPath);
191             return potentialAugmentationRead(path, biPath, result);
192         } catch (DeserializationException e) {
193             throw new IllegalStateException(e);
194         }
195     }
196
197     private DataModificationTransaction createBindingToDomTransaction(
198             final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
199         DataModificationTransaction target = biDataService.beginTransaction();
200         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier());
201         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
202             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
203             target.removeConfigurationData(biEntry);
204             LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry);
205         }
206         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
207             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
208             target.removeOperationalData(biEntry);
209             LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry);
210         }
211         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
212                 .entrySet()) {
213             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
214                     .toDataDom(entry);
215             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
216             LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry);
217         }
218         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
219                 .entrySet()) {
220             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
221                     .toDataDom(entry);
222             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
223             LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry);
224         }
225
226         return target;
227     }
228
229     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
230             final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
231         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
232                 .beginTransaction();
233         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
234             try {
235
236                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
237                 target.removeConfigurationData(baEntry);
238             } catch (DeserializationException e) {
239                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
240             }
241         }
242         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
243             try {
244
245                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
246                 target.removeOperationalData(baEntry);
247             } catch (DeserializationException e) {
248                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
249             }
250         }
251         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
252                 .getUpdatedConfigurationData().entrySet()) {
253             try {
254                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
255                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
256                 target.putConfigurationData(baKey, baData);
257             } catch (DeserializationException e) {
258                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
259             }
260         }
261         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
262                 .getUpdatedOperationalData().entrySet()) {
263             try {
264
265                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
266                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
267                 target.putOperationalData(baKey, baData);
268             } catch (DeserializationException e) {
269                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
270             }
271         }
272         return target;
273     }
274
275     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
276         return biDataService;
277     }
278
279     protected void setDomDataService(
280             final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
281         this.biDataService = biDataService;
282     }
283
284     public DataProviderService getBaDataService() {
285         return baDataService;
286     }
287
288     protected void setBindingDataService(final DataProviderService baDataService) {
289         this.baDataService = baDataService;
290     }
291
292     public RpcProviderRegistry getRpcRegistry() {
293         return baRpcRegistry;
294     }
295
296     protected void setBindingRpcRegistry(final RpcProviderRegistry rpcRegistry) {
297         this.baRpcRegistry = rpcRegistry;
298     }
299
300     public void startDataForwarding() {
301         if (baDataService instanceof AbstractForwardedDataBroker) {
302             dataForwarding = true;
303             return;
304         }
305
306         final DataProviderService baData;
307         if (baDataService instanceof BindingMountPointImpl) {
308             baData = ((BindingMountPointImpl) baDataService).getDataBrokerImpl();
309             LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService);
310         } else {
311             baData = baDataService;
312         }
313
314         if (baData instanceof DataBrokerImpl) {
315             checkState(!dataForwarding, "Connector is already forwarding data.");
316             ((DataBrokerImpl) baData).setDataReadDelegate(this);
317             ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler);
318             biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
319             baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
320         }
321
322         dataForwarding = true;
323     }
324
325     public void startRpcForwarding() {
326         if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
327             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
328             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
329             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
330                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
331                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
332                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
333             }
334             rpcForwarding = true;
335         }
336     }
337
338     public void startNotificationForwarding() {
339         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
340         if (baNotifyService != null && domNotificationService != null) {
341             baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
342
343             notificationForwarding = true;
344         }
345     }
346
347     protected void setMappingService(final BindingIndependentMappingService mappingService) {
348         this.mappingService = mappingService;
349     }
350
351     @Override
352     public Collection<ProviderFunctionality> getProviderFunctionality() {
353         return Collections.emptyList();
354     }
355
356     @Override
357     public void onSessionInitiated(final ProviderSession session) {
358         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
359         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
360
361     }
362
363     public <T extends RpcService> void onRpcRouterCreated(final Class<T> serviceType, final RpcRouter<T> router) {
364
365     }
366
367     public void setDomRpcRegistry(final RpcProvisionRegistry registry) {
368         biRpcRegistry = registry;
369     }
370
371     @Override
372     public void close() throws Exception {
373         if (baCommitHandlerRegistration != null) {
374             baCommitHandlerRegistration.close();
375         }
376         if (biCommitHandlerRegistration != null) {
377             biCommitHandlerRegistration.close();
378         }
379
380     }
381
382     private class DomToBindingTransaction implements
383             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
384
385         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
386         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
387
388         public DomToBindingTransaction(
389                 final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
390                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
391             super();
392             this.backing = backing;
393             this.modification = modification;
394             bindingOpenedTransactions.put(backing.getIdentifier(), this);
395         }
396
397         @Override
398         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
399             return modification;
400         }
401
402         @Override
403         public RpcResult<Void> rollback() throws IllegalStateException {
404             // backing.cancel();
405             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
406         }
407
408         @Override
409         public RpcResult<Void> finish() throws IllegalStateException {
410             Future<RpcResult<TransactionStatus>> result = backing.commit();
411             try {
412                 RpcResult<TransactionStatus> baResult = result.get();
413                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
414             } catch (InterruptedException e) {
415                 throw new IllegalStateException("", e);
416             } catch (ExecutionException e) {
417                 throw new IllegalStateException("", e);
418             }
419         }
420     }
421
422     private class BindingToDomTransaction implements
423             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
424
425         private final DataModificationTransaction backing;
426         private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
427
428         public BindingToDomTransaction(final DataModificationTransaction backing,
429                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
430             this.backing = backing;
431             this.modification = modification;
432             domOpenedTransactions.put(backing.getIdentifier(), this);
433         }
434
435         @Override
436         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
437             return modification;
438         }
439
440         @Override
441         public RpcResult<Void> finish() throws IllegalStateException {
442             Future<RpcResult<TransactionStatus>> result = backing.commit();
443             try {
444                 RpcResult<TransactionStatus> biResult = result.get();
445                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
446             } catch (InterruptedException e) {
447                 throw new IllegalStateException("", e);
448             } catch (ExecutionException e) {
449                 throw new IllegalStateException("", e);
450             } finally {
451                 domOpenedTransactions.remove(backing.getIdentifier());
452             }
453         }
454
455         @Override
456         public RpcResult<Void> rollback() throws IllegalStateException {
457             domOpenedTransactions.remove(backing.getIdentifier());
458             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
459         }
460     }
461
462     private class BindingToDomCommitHandler implements
463             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
464
465         @Override
466         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
467                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
468
469             /**
470              * Transaction was created as DOM transaction, in that case we do
471              * not need to forward it back.
472              */
473             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
474
475                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
476             }
477             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
478             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
479             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .",
480                     bindingTransaction.getIdentifier(), domTransaction.getIdentifier());
481             return wrapped;
482         }
483     }
484
485     private class DomToBindingCommitHandler implements //
486             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
487             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
488
489         @Override
490         public void onRegister(
491                 final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
492
493             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
494                     .getPath());
495
496         }
497
498         @Override
499         public void onUnregister(
500                 final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
501             // NOOP for now
502             // FIXME: do registration based on only active commit handlers.
503         }
504
505         @Override
506         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
507                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
508             Object identifier = domTransaction.getIdentifier();
509
510             /**
511              * We checks if the transcation was originated in this mapper. If it
512              * was originated in this mapper we are returing allways success
513              * commit hanlder to prevent creating loop in two-phase commit and
514              * duplicating data.
515              */
516             if (domOpenedTransactions.containsKey(identifier)) {
517                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
518             }
519
520             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
521             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
522             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
523                     baTransaction.getIdentifier());
524             return forwardedTransaction;
525         }
526     }
527
528     /**
529      * Manager responsible for instantiating forwarders responsible for
530      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
531      *
532      */
533     private class DomToBindingRpcForwardingManager implements
534             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>, RouterInstantiationListener,
535             GlobalRpcRegistrationListener {
536
537         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
538         private RpcProviderRegistryImpl registryImpl;
539
540         public RpcProviderRegistryImpl getRegistryImpl() {
541             return registryImpl;
542         }
543
544         public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) {
545             this.registryImpl = registryImpl;
546         }
547
548         @Override
549         public void onGlobalRpcRegistered(final Class<? extends RpcService> cls) {
550             getRpcForwarder(cls, null);
551         }
552
553         @Override
554         public void onGlobalRpcUnregistered(final Class<? extends RpcService> cls) {
555             // NOOP
556         }
557
558         @Override
559         public void onRpcRouterCreated(final RpcRouter<?> router) {
560             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
561             getRpcForwarder(router.getServiceType(), ctx);
562         }
563
564         @Override
565         public void onRouteChange(final RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
566             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
567                 bindingRoutesAdded(entry);
568             }
569         }
570
571         private void bindingRoutesAdded(final Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
572             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
573             Class<? extends RpcService> service = entry.getKey().getRpcService();
574             if (context != null) {
575                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
576             }
577         }
578
579         private DomToBindingRpcForwarder getRpcForwarder(final Class<? extends RpcService> service,
580                 final Class<? extends BaseIdentity> context) {
581             DomToBindingRpcForwarder potential = forwarders.get(service);
582             if (potential != null) {
583                 return potential;
584             }
585             if (context == null) {
586                 potential = new DomToBindingRpcForwarder(service);
587             } else {
588                 potential = new DomToBindingRpcForwarder(service, context);
589             }
590
591             forwarders.put(service, potential);
592             return potential;
593         }
594
595     }
596
597     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
598
599         private final Set<QName> supportedRpcs;
600         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
601         private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
602         private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
603         private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
604
605         public DomToBindingRpcForwarder(final Class<? extends RpcService> service) {
606             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
607             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
608             try {
609                 for (QName rpc : supportedRpcs) {
610                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
611                     strategiesByMethod.put(strategy.targetMethod, strategy);
612                     strategiesByQName.put(rpc, strategy);
613                     biRpcRegistry.addRpcImplementation(rpc, this);
614                 }
615
616             } catch (Exception e) {
617                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
618             }
619             registrations = ImmutableSet.of();
620         }
621
622         /**
623          * Constructor for Routed RPC Forwareder.
624          *
625          * @param service
626          * @param context
627          */
628         public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
629                 final Class<? extends BaseIdentity> context) {
630             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
631             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
632             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
633                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
634             try {
635                 for (QName rpc : supportedRpcs) {
636                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
637                     strategiesByMethod.put(strategy.targetMethod, strategy);
638                     strategiesByQName.put(rpc, strategy);
639                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
640                 }
641                 createDefaultDomForwarder();
642             } catch (Exception e) {
643                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
644             }
645             registrations = registrationsBuilder.build();
646         }
647
648         public void registerPaths(final Class<? extends BaseIdentity> context,
649                 final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
650             QName ctx = BindingReflections.findQName(context);
651             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
652                     toDOMInstanceIdentifier)) {
653                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
654                     reg.registerPath(ctx, path);
655                 }
656             }
657         }
658
659         @Override
660         public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
661             if (EQUALS_METHOD.equals(method)) {
662                 return false;
663             }
664             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
665             checkState(strategy != null);
666             checkArgument(args.length <= 2);
667             if (args.length == 1) {
668                 checkArgument(args[0] instanceof DataObject);
669                 return strategy.forwardToDomBroker((DataObject) args[0]);
670             }
671             return strategy.forwardToDomBroker(null);
672         }
673
674         public void removePaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
675                 final Set<InstanceIdentifier<?>> set) {
676             QName ctx = BindingReflections.findQName(context);
677             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
678                     toDOMInstanceIdentifier)) {
679                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
680                     reg.unregisterPath(ctx, path);
681                 }
682             }
683         }
684
685         @Override
686         public Set<QName> getSupportedRpcs() {
687             return supportedRpcs;
688         }
689
690         @SuppressWarnings({ "unchecked", "rawtypes" })
691         public void createDefaultDomForwarder() {
692             if (baRpcRegistryImpl != null) {
693                 Class<?> cls = rpcServiceType.get();
694                 ClassLoader clsLoader = cls.getClassLoader();
695                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
696
697                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
698                 rpcRouter.registerDefaultService(proxy);
699             }
700         }
701
702         @Override
703         public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
704             checkArgument(rpc != null);
705             checkArgument(domInput != null);
706
707             Class<? extends RpcService> rpcType = rpcServiceType.get();
708             checkState(rpcType != null);
709             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
710             checkState(rpcService != null);
711             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
712
713             try {
714                 return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
715             } catch (Exception e) {
716                 return Futures.immediateFailedFuture(e);
717             }
718         }
719
720         private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) {
721             return strategiesByQName.get(rpc);
722         }
723
724         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
725                 final Class<? extends RpcService> rpcType) throws Exception {
726             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
727                 @Override
728                 public RpcInvocationStrategy call() throws Exception {
729                     String methodName = BindingMapping.getMethodName(rpc);
730                     Method targetMethod = null;
731                     for (Method possibleMethod : rpcType.getMethods()) {
732                         if (possibleMethod.getName().equals(methodName)
733                                 && BindingReflections.isRpcMethod(possibleMethod)) {
734                             targetMethod = possibleMethod;
735                             break;
736                         }
737                     }
738                     checkState(targetMethod != null, "Rpc method not found");
739                     Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
740                     Optional<Class<? extends DataContainer>> inputClass = BindingReflections
741                             .resolveRpcInputClass(targetMethod);
742
743                     RpcInvocationStrategy strategy = null;
744                     if (outputClass.isPresent()) {
745                         if (inputClass.isPresent()) {
746                             strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
747                                     .get());
748                         } else {
749                             strategy = new NoInputInvocationStrategy(rpc, targetMethod, outputClass.get());
750                         }
751                     } else if (inputClass.isPresent()) {
752                         strategy = new NoOutputInvocationStrategy(rpc, targetMethod, inputClass.get());
753                     } else {
754                         strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
755                     }
756                     return strategy;
757                 }
758
759             });
760         }
761     }
762
763     private abstract class RpcInvocationStrategy {
764
765         protected final Method targetMethod;
766         protected final QName rpc;
767
768         public RpcInvocationStrategy(final QName rpc, final Method targetMethod) {
769             this.targetMethod = targetMethod;
770             this.rpc = rpc;
771         }
772
773         public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
774
775         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
776                 throws Exception;
777
778         public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput)
779                 throws Exception {
780             return uncheckedInvoke(rpcService, domInput);
781         }
782     }
783
784     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
785
786         @SuppressWarnings("rawtypes")
787         private final WeakReference<Class> inputClass;
788
789         @SuppressWarnings("rawtypes")
790         private final WeakReference<Class> outputClass;
791
792         @SuppressWarnings({ "rawtypes", "unchecked" })
793         public DefaultInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass,
794                 final Class<? extends DataContainer> inputClass) {
795             super(rpc, targetMethod);
796             this.outputClass = new WeakReference(outputClass);
797             this.inputClass = new WeakReference(inputClass);
798         }
799
800         @SuppressWarnings("unchecked")
801         @Override
802         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
803                 throws Exception {
804             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
805             Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
806             if (futureResult == null) {
807                 return Rpcs.getRpcResult(false);
808             }
809             RpcResult<?> bindingResult = futureResult.get();
810             final Object resultObj = bindingResult.getResult();
811             if (resultObj instanceof DataObject) {
812                 final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
813                 return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
814             }
815             return Rpcs.getRpcResult(true);
816         }
817
818         @Override
819         public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
820             if (biRpcRegistry == null) {
821                 return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
822             }
823
824             CompositeNode xml = mappingService.toDataDom(input);
825             CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
826
827             return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
828                     new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
829                         @Override
830                         public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
831                             Object baResultValue = null;
832                             if (input.getResult() != null) {
833                                 baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
834                                         input.getResult());
835                             }
836                             return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
837                         }
838                     });
839         }
840     }
841
842     private class NoInputInvocationStrategy extends RpcInvocationStrategy {
843
844         @SuppressWarnings("rawtypes")
845         private final WeakReference<Class> outputClass;
846
847         @SuppressWarnings({ "rawtypes", "unchecked" })
848         public NoInputInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass) {
849             super(rpc, targetMethod);
850             this.outputClass = new WeakReference(outputClass);
851         }
852
853         @SuppressWarnings("unchecked")
854         @Override
855         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
856                 throws Exception {
857             Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService);
858             if (futureResult == null) {
859                 return Rpcs.getRpcResult(false);
860             }
861             RpcResult<?> bindingResult = futureResult.get();
862             final Object resultObj = bindingResult.getResult();
863             if (resultObj instanceof DataObject) {
864                 final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
865                 return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
866             }
867             return Rpcs.getRpcResult(true);
868         }
869
870         @Override
871         public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
872             if (biRpcRegistry != null) {
873                 CompositeNode xml = mappingService.toDataDom(input);
874                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
875                 return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
876                         new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
877                             @Override
878                             public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
879                                 Object baResultValue = null;
880                                 if (input.getResult() != null) {
881                                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
882                                             input.getResult());
883                                 }
884                                 return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
885                             }
886                         });
887             } else {
888                 return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
889             }
890         }
891     }
892
893     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
894
895         public NoInputNoOutputInvocationStrategy(final QName rpc, final Method targetMethod) {
896             super(rpc, targetMethod);
897         }
898
899         @Override
900         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
901                 throws Exception {
902             @SuppressWarnings("unchecked")
903             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
904             RpcResult<Void> bindingResult = result.get();
905             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
906         }
907
908         @Override
909         public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
910             return Futures.immediateFuture(null);
911         }
912     }
913
914     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
915
916         @SuppressWarnings("rawtypes")
917         private final WeakReference<Class> inputClass;
918
919         @SuppressWarnings({ "rawtypes", "unchecked" })
920         public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod,
921                 final Class<? extends DataContainer> inputClass) {
922             super(rpc, targetMethod);
923             this.inputClass = new WeakReference(inputClass);
924         }
925
926         @Override
927         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
928                 throws Exception {
929             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
930             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
931             if (result == null) {
932                 return Rpcs.getRpcResult(false);
933             }
934             RpcResult<?> bindingResult = result.get();
935             return Rpcs.getRpcResult(true);
936         }
937
938         @Override
939         public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
940             if (biRpcRegistry == null) {
941                 return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
942             }
943
944             CompositeNode xml = mappingService.toDataDom(input);
945             CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
946
947             return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
948                     new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
949                         @Override
950                         public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
951                             return Rpcs.<Void> getRpcResult(input.isSuccessful(), null, input.getErrors());
952                         }
953                     });
954         }
955     }
956
957     public boolean isRpcForwarding() {
958         return rpcForwarding;
959     }
960
961     public boolean isDataForwarding() {
962         return dataForwarding;
963     }
964
965     public boolean isNotificationForwarding() {
966         return notificationForwarding;
967     }
968
969     public BindingIndependentMappingService getMappingService() {
970         return mappingService;
971     }
972
973     public void setBindingNotificationService(final NotificationProviderService baService) {
974         this.baNotifyService = baService;
975
976     }
977
978     public void setDomNotificationService(final NotificationPublishService domService) {
979         this.domNotificationService = domService;
980     }
981
982     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
983
984         private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
985         private final Set<QName> supportedNotifications = new HashSet<>();
986
987         @Override
988         public Set<QName> getSupportedNotifications() {
989             return Collections.unmodifiableSet(supportedNotifications);
990         }
991
992         @Override
993         public void onNotification(final CompositeNode notification) {
994             QName qname = notification.getNodeType();
995             WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
996             if (potential != null) {
997                 Class<? extends Notification> potentialClass = potential.get();
998                 if (potentialClass != null) {
999                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
1000                             notification);
1001
1002                     if (baNotification instanceof Notification) {
1003                         baNotifyService.publish((Notification) baNotification);
1004                     }
1005                 }
1006             }
1007         }
1008
1009         @Override
1010         public void onNotificationSubscribtion(final Class<? extends Notification> notificationType) {
1011             QName qname = BindingReflections.findQName(notificationType);
1012             if (qname != null) {
1013                 WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
1014                         new WeakReference<Class<? extends Notification>>(notificationType));
1015                 if (already == null) {
1016                     domNotificationService.addNotificationListener(qname, this);
1017                     supportedNotifications.add(qname);
1018                 }
1019             }
1020         }
1021     }
1022 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.