Merge "Bug 1029: Remove dead code: sal-schema-repository-api"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / impl / ForwardedBackwardsCompatibleDataBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.impl;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
20
21 import org.opendaylight.controller.md.sal.binding.api.BindingDataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
23 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
34 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
35 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
36 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
37 import org.opendaylight.controller.sal.common.util.Rpcs;
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
39 import org.opendaylight.yangtools.concepts.Delegator;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.base.Function;
52 import com.google.common.util.concurrent.AsyncFunction;
53 import com.google.common.util.concurrent.FutureCallback;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.ListeningExecutorService;
57
58 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
59
60     private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
61
62     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
63     private final ListenerRegistry<DataChangeListener> fakeRegistry = ListenerRegistry.create();
64     private final ListeningExecutorService executorService;
65
66     public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
67             final BindingIndependentMappingService mappingService, final ListeningExecutorService executor) {
68         super(domDataBroker, mappingService);
69         executorService = executor;
70         LOG.info("ForwardedBackwardsCompatibleBroker started.");
71     }
72
73     @Override
74     public DataModificationTransaction beginTransaction() {
75         return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
76     }
77
78     @Override
79     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
80         DataModificationTransaction tx = beginTransaction();
81         return tx.readConfigurationData(path);
82     }
83
84     @Override
85     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
86         DataModificationTransaction tx = beginTransaction();
87         return tx.readOperationalData(path);
88     }
89
90     @Override
91     public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
92             final InstanceIdentifier<? extends DataObject> path,
93             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
94
95
96         //transformingCommitHandler = new TransformingDataChangeListener
97         //fakeCommitHandler =  registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
98
99         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
100         commitHandlers.put(path, reg);
101         return reg;
102     }
103
104     @Override
105     @Deprecated
106     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
107             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
108         throw new UnsupportedOperationException("Not supported contract.");
109     }
110
111     @Override
112     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
113             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
114
115
116         BindingDataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
117         BindingDataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
118
119         ListenerRegistration<BindingDataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
120         ListenerRegistration<BindingDataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
121
122         return new LegacyListenerRegistration(listener,cfgReg,operReg);
123     }
124
125     @Override
126     public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
127             final InstanceIdentifier<? extends DataObject> path,
128             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
129         throw new UnsupportedOperationException("Data reader contract is not supported.");
130     }
131
132     @Override
133     public void close() throws Exception {
134         // TODO Auto-generated method stub
135
136     }
137
138     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
139
140         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
141         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
142         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
143
144             @Override
145             public Boolean call() throws Exception {
146                 try {
147                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
148
149                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
150                                 .getInstance().requestCommit(tx);
151                         subTrans.add(subTx);
152                     }
153                 } catch (Exception e) {
154                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
155                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
156                         subTx.rollback();
157                     }
158                     return false;
159                 }
160                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
161                 return true;
162             }
163
164         });
165
166         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
167
168             @Override
169             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
170                 if(requestCommitSuccess) {
171                     return tx.getDelegate().commit();
172                 }
173                 return Futures.immediateFuture(Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError>emptySet()));
174             }
175         });
176
177         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
178             @Override
179             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
180                 if(input.isSuccessful()) {
181                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
182                         subTx.finish();
183                     }
184                 } else {
185                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
186                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
187                         subTx.rollback();
188                     }
189                 }
190                 return input;
191             }
192         });
193     }
194
195     private class ForwardedBackwardsCompatibleTransacion extends
196             AbstractForwardedTransaction<DOMDataReadWriteTransaction> implements DataModificationTransaction {
197
198         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
199         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
200         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
201         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
202         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
203         private TransactionStatus status = TransactionStatus.NEW;
204
205         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
206         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
207
208
209         @Override
210         public final TransactionStatus getStatus() {
211             return status;
212         }
213
214         protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
215                 final BindingToNormalizedNodeCodec codec) {
216             super(delegate, codec);
217             LOG.debug("Tx {} allocated.",getIdentifier());
218         }
219
220         @Override
221         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
222             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
223             if(previouslyRemoved) {
224                 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
225             } else {
226                 doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
227             }
228         }
229
230         @Override
231         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
232             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
233             DataObject originalObj = readConfigurationData(path);
234             if (originalObj != null) {
235                 original.put(path, originalObj);
236
237             } else {
238                 created.put(path, data);
239             }
240             updated.put(path, data);
241             if(previouslyRemoved) {
242                 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
243             } else {
244                 doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
245             }
246         }
247
248         @Override
249         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
250             posponedRemovedOperational.add(path);
251         }
252
253         @Override
254         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
255             posponedRemovedConfiguration.add(path);
256         }
257
258         @Override
259         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
260             return Collections.emptyMap();
261         }
262
263         @Override
264         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
265             return created;
266         }
267
268         @Override
269         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
270             return Collections.emptyMap();
271         }
272
273         @Override
274         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
275             return updated;
276         }
277
278         @Override
279         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
280             return removed;
281         }
282
283         @Override
284         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
285             return Collections.emptySet();
286         }
287
288         @Override
289         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
290             return original;
291         }
292
293         @Override
294         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
295             return Collections.emptyMap();
296         }
297
298         @Override
299         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
300             try {
301                 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
302             } catch (InterruptedException | ExecutionException e) {
303                 LOG.error("Read of {} failed.", path,e);
304                 return null;
305             }
306         }
307
308         @Override
309         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
310             try {
311                 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
312             } catch (InterruptedException | ExecutionException e) {
313                 LOG.error("Read of {} failed.", path,e);
314                 return null;
315             }
316         }
317
318         @Override
319         public Object getIdentifier() {
320             return getDelegate().getIdentifier();
321         }
322
323         private void changeStatus(final TransactionStatus status) {
324             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
325             this.status = status;
326
327             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
328                 try {
329                     listener.getInstance().onStatusUpdated(this, status);
330                 } catch (Exception e) {
331                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
332                 }
333             }
334         }
335
336         @Override
337         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
338
339             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
340                 doDelete(getDelegate(), LogicalDatastoreType.CONFIGURATION, path);
341             }
342
343             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
344                 doDelete(getDelegate(), LogicalDatastoreType.OPERATIONAL, path);
345             }
346
347             changeStatus(TransactionStatus.SUBMITED);
348
349             final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
350
351             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
352                 @Override
353                 public void onSuccess(final RpcResult<TransactionStatus> result) {
354                     changeStatus(result.getResult());
355                 }
356
357                 @Override
358                 public void onFailure(final Throwable t) {
359                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
360                     changeStatus(TransactionStatus.FAILED);
361                 }
362             });
363
364             return f;
365         }
366
367         @Override
368         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
369             return listeners.register(listener);
370         }
371
372     }
373
374     private class CommitHandlerRegistrationImpl extends
375             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
376
377         private final InstanceIdentifier<? extends DataObject> path;
378
379         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
380                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
381             super(commitHandler);
382             this.path = path;
383         }
384
385         @Override
386         protected void removeRegistration() {
387             commitHandlers.remove(path, this);
388         }
389
390     }
391
392
393     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
394
395         private final DataChangeListener instance;
396         private final ListenerRegistration<BindingDataChangeListener> cfgReg;
397         private final ListenerRegistration<BindingDataChangeListener> operReg;
398
399         public LegacyListenerRegistration(final DataChangeListener listener,
400                 final ListenerRegistration<BindingDataChangeListener> cfgReg,
401                 final ListenerRegistration<BindingDataChangeListener> operReg) {
402             this.instance = listener;
403             this.cfgReg = cfgReg;
404             this.operReg = operReg;
405         }
406
407         @Override
408         public DataChangeListener getInstance() {
409             return instance;
410         }
411
412         @Override
413         public void close() {
414             cfgReg.close();
415             operReg.close();
416         }
417
418     }
419
420     private static class BackwardsCompatibleOperationalDataChangeInvoker implements BindingDataChangeListener, Delegator<DataChangeListener> {
421
422         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
423
424
425         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
426             this.delegate = listener;
427         }
428
429         @SuppressWarnings({ "unchecked", "rawtypes" })
430         @Override
431         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
432
433             DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
434             delegate.onDataChanged(legacyChange);
435
436         }
437
438         @Override
439         public DataChangeListener getDelegate() {
440             return (DataChangeListener) delegate;
441         }
442
443     }
444
445     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements BindingDataChangeListener, Delegator<DataChangeListener> {
446         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
447
448         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
449             this.delegate = listener;
450         }
451
452         @SuppressWarnings({ "unchecked", "rawtypes" })
453         @Override
454         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
455
456             DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
457
458             delegate.onDataChanged(legacyChange);
459
460         }
461
462         @Override
463         public DataChangeListener getDelegate() {
464             return (DataChangeListener) delegate;
465         }
466
467     }
468 }