Bug 1245: Dropped Binding prefix from Binding Data APIs.
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / impl / ForwardedBackwardsCompatibleDataBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.impl;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
20
21 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
22 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
29 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
33 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
34 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
35 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
36 import org.opendaylight.controller.sal.common.util.Rpcs;
37 import org.opendaylight.controller.sal.core.api.model.SchemaService;
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
39 import org.opendaylight.yangtools.concepts.Delegator;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.base.Function;
52 import com.google.common.util.concurrent.AsyncFunction;
53 import com.google.common.util.concurrent.FutureCallback;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.ListeningExecutorService;
57
58 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
59
60     private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
61
62     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
63     private final ListeningExecutorService executorService;
64
65     public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
66             final BindingIndependentMappingService mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
67         super(domDataBroker, mappingService,schemaService);
68         executorService = executor;
69         LOG.info("ForwardedBackwardsCompatibleBroker started.");
70     }
71
72     @Override
73     public DataModificationTransaction beginTransaction() {
74         return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
75     }
76
77     @Override
78     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
79         DataModificationTransaction tx = beginTransaction();
80         return tx.readConfigurationData(path);
81     }
82
83     @Override
84     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
85         DataModificationTransaction tx = beginTransaction();
86         return tx.readOperationalData(path);
87     }
88
89     @Override
90     public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
91             final InstanceIdentifier<? extends DataObject> path,
92             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
93
94
95         //transformingCommitHandler = new TransformingDataChangeListener
96         //fakeCommitHandler =  registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
97
98         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
99         commitHandlers.put(path, reg);
100         return reg;
101     }
102
103     @Override
104     @Deprecated
105     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
106             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
107         throw new UnsupportedOperationException("Not supported contract.");
108     }
109
110     @Override
111     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
112             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
113
114
115         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
116         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
117
118         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
119         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
120
121         return new LegacyListenerRegistration(listener,cfgReg,operReg);
122     }
123
124     @Override
125     public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
126             final InstanceIdentifier<? extends DataObject> path,
127             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
128         throw new UnsupportedOperationException("Data reader contract is not supported.");
129     }
130
131     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
132
133         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
134         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
135         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
136
137             @Override
138             public Boolean call() throws Exception {
139                 try {
140                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
141
142                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
143                                 .getInstance().requestCommit(tx);
144                         subTrans.add(subTx);
145                     }
146                 } catch (Exception e) {
147                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
148                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
149                         subTx.rollback();
150                     }
151                     return false;
152                 }
153                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
154                 return true;
155             }
156
157         });
158
159         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
160
161             @Override
162             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
163                 if(requestCommitSuccess) {
164                     return tx.getDelegate().commit();
165                 }
166                 return Futures.immediateFuture(Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError>emptySet()));
167             }
168         });
169
170         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
171             @Override
172             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
173                 if(input.isSuccessful()) {
174                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
175                         subTx.finish();
176                     }
177                 } else {
178                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
179                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
180                         subTx.rollback();
181                     }
182                 }
183                 return input;
184             }
185         });
186     }
187
188     private class ForwardedBackwardsCompatibleTransacion extends
189             AbstractForwardedTransaction<DOMDataReadWriteTransaction> implements DataModificationTransaction {
190
191         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
192         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
193         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
194         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
195         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
196         private TransactionStatus status = TransactionStatus.NEW;
197
198         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
199         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
200
201
202         @Override
203         public final TransactionStatus getStatus() {
204             return status;
205         }
206
207         protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
208                 final BindingToNormalizedNodeCodec codec) {
209             super(delegate, codec);
210             LOG.debug("Tx {} allocated.",getIdentifier());
211         }
212
213         @Override
214         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
215             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
216             if(previouslyRemoved) {
217                 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
218             } else {
219                 doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
220             }
221         }
222
223         @Override
224         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
225             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
226             DataObject originalObj = readConfigurationData(path);
227             if (originalObj != null) {
228                 original.put(path, originalObj);
229
230             } else {
231                 created.put(path, data);
232             }
233             updated.put(path, data);
234             if(previouslyRemoved) {
235                 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
236             } else {
237                 doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
238             }
239         }
240
241         @Override
242         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
243             posponedRemovedOperational.add(path);
244         }
245
246         @Override
247         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
248             posponedRemovedConfiguration.add(path);
249         }
250
251         @Override
252         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
253             return Collections.emptyMap();
254         }
255
256         @Override
257         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
258             return created;
259         }
260
261         @Override
262         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
263             return Collections.emptyMap();
264         }
265
266         @Override
267         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
268             return updated;
269         }
270
271         @Override
272         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
273             return removed;
274         }
275
276         @Override
277         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
278             return Collections.emptySet();
279         }
280
281         @Override
282         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
283             return original;
284         }
285
286         @Override
287         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
288             return Collections.emptyMap();
289         }
290
291         @Override
292         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
293             try {
294                 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
295             } catch (InterruptedException | ExecutionException e) {
296                 LOG.error("Read of {} failed.", path,e);
297                 return null;
298             }
299         }
300
301         @Override
302         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
303             try {
304                 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
305             } catch (InterruptedException | ExecutionException e) {
306                 LOG.error("Read of {} failed.", path,e);
307                 return null;
308             }
309         }
310
311         @Override
312         public Object getIdentifier() {
313             return getDelegate().getIdentifier();
314         }
315
316         private void changeStatus(final TransactionStatus status) {
317             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
318             this.status = status;
319
320             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
321                 try {
322                     listener.getInstance().onStatusUpdated(this, status);
323                 } catch (Exception e) {
324                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
325                 }
326             }
327         }
328
329         @Override
330         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
331
332             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
333                 doDelete(getDelegate(), LogicalDatastoreType.CONFIGURATION, path);
334             }
335
336             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
337                 doDelete(getDelegate(), LogicalDatastoreType.OPERATIONAL, path);
338             }
339
340             changeStatus(TransactionStatus.SUBMITED);
341
342             final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
343
344             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
345                 @Override
346                 public void onSuccess(final RpcResult<TransactionStatus> result) {
347                     changeStatus(result.getResult());
348                 }
349
350                 @Override
351                 public void onFailure(final Throwable t) {
352                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
353                     changeStatus(TransactionStatus.FAILED);
354                 }
355             });
356
357             return f;
358         }
359
360         @Override
361         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
362             return listeners.register(listener);
363         }
364
365     }
366
367     private class CommitHandlerRegistrationImpl extends
368             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
369
370         private final InstanceIdentifier<? extends DataObject> path;
371
372         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
373                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
374             super(commitHandler);
375             this.path = path;
376         }
377
378         @Override
379         protected void removeRegistration() {
380             commitHandlers.remove(path, this);
381         }
382
383     }
384
385
386     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
387
388         private final DataChangeListener instance;
389         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
390         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
391
392         public LegacyListenerRegistration(final DataChangeListener listener,
393                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
394                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
395             this.instance = listener;
396             this.cfgReg = cfgReg;
397             this.operReg = operReg;
398         }
399
400         @Override
401         public DataChangeListener getInstance() {
402             return instance;
403         }
404
405         @Override
406         public void close() {
407             cfgReg.close();
408             operReg.close();
409         }
410
411     }
412
413     private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
414
415         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
416
417
418         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
419             this.delegate = listener;
420         }
421
422         @SuppressWarnings({ "unchecked", "rawtypes" })
423         @Override
424         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
425
426             DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
427             delegate.onDataChanged(legacyChange);
428
429         }
430
431         @Override
432         public DataChangeListener getDelegate() {
433             return (DataChangeListener) delegate;
434         }
435
436     }
437
438     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
439         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
440
441         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
442             this.delegate = listener;
443         }
444
445         @SuppressWarnings({ "unchecked", "rawtypes" })
446         @Override
447         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
448
449             DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
450
451             delegate.onDataChanged(legacyChange);
452
453         }
454
455         @Override
456         public DataChangeListener getDelegate() {
457             return (DataChangeListener) delegate;
458         }
459
460     }
461 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.