Merge "BUG-692 Replace strings with ModifyAction enum"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / impl / ForwardedBackwardsCompatibleDataBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.impl;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
20
21 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
22 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
29 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
33 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
34 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
35 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
36 import org.opendaylight.controller.sal.core.api.model.SchemaService;
37 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
38 import org.opendaylight.yangtools.concepts.Delegator;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
42 import org.opendaylight.yangtools.yang.binding.DataObject;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.opendaylight.yangtools.yang.common.RpcResult;
45 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
46 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import com.google.common.base.Function;
51 import com.google.common.util.concurrent.AsyncFunction;
52 import com.google.common.util.concurrent.FutureCallback;
53 import com.google.common.util.concurrent.Futures;
54 import com.google.common.util.concurrent.ListenableFuture;
55 import com.google.common.util.concurrent.ListeningExecutorService;
56
57 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
58
59     private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
60
61     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
62     private final ListeningExecutorService executorService;
63
64     public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
65             final BindingIndependentMappingService mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
66         super(domDataBroker, mappingService,schemaService);
67         executorService = executor;
68         LOG.info("ForwardedBackwardsCompatibleBroker started.");
69     }
70
71     @Override
72     public DataModificationTransaction beginTransaction() {
73         return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
74     }
75
76     @Override
77     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
78         DataModificationTransaction tx = beginTransaction();
79         return tx.readConfigurationData(path);
80     }
81
82     @Override
83     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
84         DataModificationTransaction tx = beginTransaction();
85         return tx.readOperationalData(path);
86     }
87
88     @Override
89     public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
90             final InstanceIdentifier<? extends DataObject> path,
91             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
92
93
94         //transformingCommitHandler = new TransformingDataChangeListener
95         //fakeCommitHandler =  registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
96
97         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
98         commitHandlers.put(path, reg);
99         return reg;
100     }
101
102     @Override
103     @Deprecated
104     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
105             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
106         throw new UnsupportedOperationException("Not supported contract.");
107     }
108
109     @Override
110     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
111             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
112
113
114         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
115         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
116
117         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
118         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
119
120         return new LegacyListenerRegistration(listener,cfgReg,operReg);
121     }
122
123     @Override
124     public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
125             final InstanceIdentifier<? extends DataObject> path,
126             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
127         throw new UnsupportedOperationException("Data reader contract is not supported.");
128     }
129
130     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
131
132         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
133         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
134         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
135
136             @Override
137             public Boolean call() throws Exception {
138                 try {
139                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
140
141                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
142                                 .getInstance().requestCommit(tx);
143                         subTrans.add(subTx);
144                     }
145                 } catch (Exception e) {
146                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
147                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
148                         subTx.rollback();
149                     }
150                     return false;
151                 }
152                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
153                 return true;
154             }
155
156         });
157
158         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
159
160             @Override
161             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
162                 if(requestCommitSuccess) {
163                     return tx.getDelegate().commit();
164                 }
165                 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
166             }
167         });
168
169         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
170             @Override
171             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
172                 if(input.isSuccessful()) {
173                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
174                         subTx.finish();
175                     }
176                 } else {
177                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
178                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
179                         subTx.rollback();
180                     }
181                 }
182                 return input;
183             }
184         });
185     }
186
187     private class ForwardedBackwardsCompatibleTransacion extends
188             AbstractReadWriteTransaction implements DataModificationTransaction {
189
190         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
191         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
192         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
193         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
194         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
195         private TransactionStatus status = TransactionStatus.NEW;
196
197         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
198         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
199
200
201         @Override
202         public final TransactionStatus getStatus() {
203             return status;
204         }
205
206         protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
207                 final BindingToNormalizedNodeCodec codec) {
208             super(delegate, codec);
209             LOG.debug("Tx {} allocated.",getIdentifier());
210         }
211
212         @Override
213         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
214             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
215             if(previouslyRemoved) {
216                 doPutWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
217             } else {
218                 doMergeWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
219             }
220         }
221
222         @Override
223         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
224             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
225             DataObject originalObj = readConfigurationData(path);
226             if (originalObj != null) {
227                 original.put(path, originalObj);
228
229             } else {
230                 created.put(path, data);
231             }
232             updated.put(path, data);
233             if(previouslyRemoved) {
234                 doPutWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
235             } else {
236                 doMergeWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
237             }
238         }
239
240         @Override
241         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
242             posponedRemovedOperational.add(path);
243         }
244
245         @Override
246         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
247             posponedRemovedConfiguration.add(path);
248         }
249
250         @Override
251         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
252             return Collections.emptyMap();
253         }
254
255         @Override
256         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
257             return created;
258         }
259
260         @Override
261         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
262             return Collections.emptyMap();
263         }
264
265         @Override
266         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
267             return updated;
268         }
269
270         @Override
271         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
272             return removed;
273         }
274
275         @Override
276         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
277             return Collections.emptySet();
278         }
279
280         @Override
281         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
282             return original;
283         }
284
285         @Override
286         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
287             return Collections.emptyMap();
288         }
289
290         @Override
291         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
292             try {
293                 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
294             } catch (InterruptedException | ExecutionException e) {
295                 LOG.error("Read of {} failed.", path,e);
296                 return null;
297             }
298         }
299
300         @Override
301         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
302             try {
303                 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
304             } catch (InterruptedException | ExecutionException e) {
305                 LOG.error("Read of {} failed.", path,e);
306                 return null;
307             }
308         }
309
310         private void changeStatus(final TransactionStatus status) {
311             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
312             this.status = status;
313
314             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
315                 try {
316                     listener.getInstance().onStatusUpdated(this, status);
317                 } catch (Exception e) {
318                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
319                 }
320             }
321         }
322
323         @Override
324         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
325
326             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
327                 doDelete(LogicalDatastoreType.CONFIGURATION, path);
328             }
329
330             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
331                 doDelete(LogicalDatastoreType.OPERATIONAL, path);
332             }
333
334             changeStatus(TransactionStatus.SUBMITED);
335
336             final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
337
338             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
339                 @Override
340                 public void onSuccess(final RpcResult<TransactionStatus> result) {
341                     changeStatus(result.getResult());
342                 }
343
344                 @Override
345                 public void onFailure(final Throwable t) {
346                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
347                     changeStatus(TransactionStatus.FAILED);
348                 }
349             });
350
351             return f;
352         }
353
354         @Override
355         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
356             return listeners.register(listener);
357         }
358
359     }
360
361     private class CommitHandlerRegistrationImpl extends
362             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
363
364         private final InstanceIdentifier<? extends DataObject> path;
365
366         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
367                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
368             super(commitHandler);
369             this.path = path;
370         }
371
372         @Override
373         protected void removeRegistration() {
374             commitHandlers.remove(path, this);
375         }
376
377     }
378
379
380     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
381
382         private final DataChangeListener instance;
383         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
384         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
385
386         public LegacyListenerRegistration(final DataChangeListener listener,
387                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
388                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
389             this.instance = listener;
390             this.cfgReg = cfgReg;
391             this.operReg = operReg;
392         }
393
394         @Override
395         public DataChangeListener getInstance() {
396             return instance;
397         }
398
399         @Override
400         public void close() {
401             cfgReg.close();
402             operReg.close();
403         }
404
405     }
406
407     private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
408
409         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
410
411
412         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
413             this.delegate = listener;
414         }
415
416         @SuppressWarnings({ "unchecked", "rawtypes" })
417         @Override
418         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
419
420             DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
421             delegate.onDataChanged(legacyChange);
422
423         }
424
425         @Override
426         public DataChangeListener getDelegate() {
427             return (DataChangeListener) delegate;
428         }
429
430     }
431
432     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
433         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
434
435         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
436             this.delegate = listener;
437         }
438
439         @SuppressWarnings({ "unchecked", "rawtypes" })
440         @Override
441         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
442
443             DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
444
445             delegate.onDataChanged(legacyChange);
446
447         }
448
449         @Override
450         public DataChangeListener getDelegate() {
451             return (DataChangeListener) delegate;
452         }
453
454     }
455 }