f8bc204947634c79c2fcda0148cd9f15c76a35ed
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / impl / ForwardedBackwardsCompatibleDataBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.impl;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.AsyncFunction;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
27 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
30 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
32 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
38 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
39 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
40 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
41 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
42 import org.opendaylight.controller.sal.core.api.model.SchemaService;
43 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
44 import org.opendaylight.yangtools.concepts.Delegator;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.concepts.Registration;
47 import org.opendaylight.yangtools.util.ListenerRegistry;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 @SuppressWarnings("deprecation")
56 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
57
58     private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
59
60     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
61     private final ListeningExecutorService executorService;
62
63     public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
64             final BindingToNormalizedNodeCodec mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
65         super(domDataBroker, mappingService,schemaService);
66         executorService = executor;
67         LOG.info("ForwardedBackwardsCompatibleBroker started.");
68     }
69
70     @Override
71     public DataModificationTransaction beginTransaction() {
72         return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
73     }
74
75     @Override
76     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
77         DataModificationTransaction tx = beginTransaction();
78         return tx.readConfigurationData(path);
79     }
80
81     @Override
82     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
83         DataModificationTransaction tx = beginTransaction();
84         return tx.readOperationalData(path);
85     }
86
87     @Override
88     public Registration registerCommitHandler(
89             final InstanceIdentifier<? extends DataObject> path,
90             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
91
92
93         //transformingCommitHandler = new TransformingDataChangeListener
94         //fakeCommitHandler =  registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
95
96         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
97         commitHandlers.put(path, reg);
98         return reg;
99     }
100
101     @Override
102     @Deprecated
103     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
104             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
105         throw new UnsupportedOperationException("Not supported contract.");
106     }
107
108     @Override
109     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
110             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
111
112
113         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
114         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
115
116         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
117         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
118
119         return new LegacyListenerRegistration(listener,cfgReg,operReg);
120     }
121
122     @Override
123     public Registration registerDataReader(
124             final InstanceIdentifier<? extends DataObject> path,
125             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
126         throw new UnsupportedOperationException("Data reader contract is not supported.");
127     }
128
129     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
130
131         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
132         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
133         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
134
135             @Override
136             public Boolean call() throws Exception {
137                 try {
138                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
139
140                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
141                                 .getInstance().requestCommit(tx);
142                         subTrans.add(subTx);
143                     }
144                 } catch (Exception e) {
145                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
146                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
147                         subTx.rollback();
148                     }
149                     return false;
150                 }
151                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
152                 return true;
153             }
154
155         });
156
157         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
158
159             @Override
160             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
161                 if(requestCommitSuccess) {
162                     return AbstractDataTransaction.convertToLegacyCommitFuture(tx.getDelegate().submit());
163                 }
164                 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
165             }
166         });
167
168         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
169             @Override
170             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
171                 if(input.isSuccessful()) {
172                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
173                         subTx.finish();
174                     }
175                 } else {
176                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
177                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
178                         subTx.rollback();
179                     }
180                 }
181                 return input;
182             }
183         });
184     }
185
186     @Deprecated
187     private class ForwardedBackwardsCompatibleTransacion extends
188             AbstractReadWriteTransaction implements DataModificationTransaction {
189
190         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
191         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
192         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
193         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
194         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
195         private TransactionStatus status = TransactionStatus.NEW;
196
197         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
198         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
199
200
201         @Override
202         public final TransactionStatus getStatus() {
203             return status;
204         }
205
206         protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
207                 final BindingToNormalizedNodeCodec codec) {
208             super(delegate, codec);
209             LOG.debug("Tx {} allocated.",getIdentifier());
210         }
211
212         @Override
213         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
214             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
215
216             @SuppressWarnings({ "rawtypes", "unchecked" })
217             final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
218             if(previouslyRemoved) {
219                 put(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
220             } else {
221                 merge(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
222             }
223         }
224
225         @Override
226         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
227             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
228             DataObject originalObj = readConfigurationData(path);
229             if (originalObj != null) {
230                 original.put(path, originalObj);
231
232             } else {
233                 created.put(path, data);
234             }
235             updated.put(path, data);
236             @SuppressWarnings({"rawtypes","unchecked"})
237             final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
238             if(previouslyRemoved) {
239                 put(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
240             } else {
241                 merge(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
242             }
243         }
244
245         @Override
246         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
247             posponedRemovedOperational.add(path);
248         }
249
250         @Override
251         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
252             posponedRemovedConfiguration.add(path);
253         }
254
255         @Override
256         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
257             return Collections.emptyMap();
258         }
259
260         @Override
261         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
262             return created;
263         }
264
265         @Override
266         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
267             return Collections.emptyMap();
268         }
269
270         @Override
271         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
272             return updated;
273         }
274
275         @Override
276         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
277             return removed;
278         }
279
280         @Override
281         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
282             return Collections.emptySet();
283         }
284
285         @Override
286         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
287             return original;
288         }
289
290         @Override
291         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
292             return Collections.emptyMap();
293         }
294
295         @Override
296         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
297             try {
298                 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
299             } catch (InterruptedException | ExecutionException e) {
300                 LOG.error("Read of {} failed.", path,e);
301                 return null;
302             }
303         }
304
305         @Override
306         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
307             try {
308                 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
309             } catch (InterruptedException | ExecutionException e) {
310                 LOG.error("Read of {} failed.", path,e);
311                 return null;
312             }
313         }
314
315         private void changeStatus(final TransactionStatus status) {
316             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
317             this.status = status;
318
319             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
320                 try {
321                     listener.getInstance().onStatusUpdated(this, status);
322                 } catch (Exception e) {
323                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
324                 }
325             }
326         }
327
328         @Override
329         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
330
331             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
332                 doDelete(LogicalDatastoreType.CONFIGURATION, path);
333             }
334
335             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
336                 doDelete(LogicalDatastoreType.OPERATIONAL, path);
337             }
338
339             changeStatus(TransactionStatus.SUBMITED);
340
341             final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
342
343             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
344                 @Override
345                 public void onSuccess(final RpcResult<TransactionStatus> result) {
346                     changeStatus(result.getResult());
347                 }
348
349                 @Override
350                 public void onFailure(final Throwable t) {
351                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
352                     changeStatus(TransactionStatus.FAILED);
353                 }
354             });
355
356             return f;
357         }
358
359         @Override
360         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
361             return listeners.register(listener);
362         }
363
364     }
365
366     private class CommitHandlerRegistrationImpl extends
367             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
368
369         private final InstanceIdentifier<? extends DataObject> path;
370
371         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
372                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
373             super(commitHandler);
374             this.path = path;
375         }
376
377         @Override
378         protected void removeRegistration() {
379             commitHandlers.remove(path, this);
380         }
381
382     }
383
384
385     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
386
387         private final DataChangeListener instance;
388         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
389         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
390
391         public LegacyListenerRegistration(final DataChangeListener listener,
392                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
393                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
394             this.instance = listener;
395             this.cfgReg = cfgReg;
396             this.operReg = operReg;
397         }
398
399         @Override
400         public DataChangeListener getInstance() {
401             return instance;
402         }
403
404         @Override
405         public void close() {
406             cfgReg.close();
407             operReg.close();
408         }
409
410     }
411
412     private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
413
414         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
415
416
417         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
418             this.delegate = listener;
419         }
420
421         @SuppressWarnings({ "unchecked", "rawtypes" })
422         @Override
423         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
424
425             DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
426             delegate.onDataChanged(legacyChange);
427
428         }
429
430         @Override
431         public DataChangeListener getDelegate() {
432             return (DataChangeListener) delegate;
433         }
434
435     }
436
437     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
438         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
439
440         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
441             this.delegate = listener;
442         }
443
444         @SuppressWarnings({ "unchecked", "rawtypes" })
445         @Override
446         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
447
448             DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
449
450             delegate.onDataChanged(legacyChange);
451
452         }
453
454         @Override
455         public DataChangeListener getDelegate() {
456             return (DataChangeListener) delegate;
457         }
458
459     }
460 }