Bug 2364: Migrated Binding MD-SAL to not use composites nodes
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / compat / HydrogenDataBrokerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.compat;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.AsyncFunction;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
29 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
32 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
33 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
36 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
39 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
40 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
41 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
42 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
43 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
44 import org.opendaylight.yangtools.concepts.Delegator;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.concepts.Registration;
47 import org.opendaylight.yangtools.util.ListenerRegistry;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 @Deprecated
56 public class HydrogenDataBrokerAdapter implements DataProviderService, AutoCloseable {
57
58     private static final Logger LOG = LoggerFactory.getLogger(HydrogenDataBrokerAdapter.class);
59
60     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
61     private final ListeningExecutorService executorService = SingletonHolder.getDefaultCommitExecutor();
62
63     private final DataBroker delegate;
64
65     public HydrogenDataBrokerAdapter(final DataBroker dataBroker) {
66         delegate = dataBroker;
67         LOG.info("ForwardedBackwardsCompatibleBroker started.");
68     }
69
70     @Override
71     public DataModificationTransaction beginTransaction() {
72         return new ForwardedBackwardsCompatibleTransacion(delegate.newReadWriteTransaction());
73     }
74
75     @Override
76     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
77         DataModificationTransaction tx = beginTransaction();
78         return tx.readConfigurationData(path);
79     }
80
81     @Override
82     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
83         DataModificationTransaction tx = beginTransaction();
84         return tx.readOperationalData(path);
85     }
86
87     @Override
88     public Registration registerCommitHandler(
89             final InstanceIdentifier<? extends DataObject> path,
90             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
91         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
92         commitHandlers.put(path, reg);
93         return reg;
94     }
95
96     @Override
97     @Deprecated
98     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
99             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
100         throw new UnsupportedOperationException("Not supported contract.");
101     }
102
103     @Override
104     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
105             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
106
107
108         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
109         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
110
111         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = delegate.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
112         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = delegate.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
113
114         return new LegacyListenerRegistration(listener,cfgReg,operReg);
115     }
116
117     @Override
118     public Registration registerDataReader(
119             final InstanceIdentifier<? extends DataObject> path,
120             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
121         throw new UnsupportedOperationException("Data reader contract is not supported.");
122     }
123
124     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
125
126         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
127         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
128         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
129
130             @Override
131             public Boolean call() throws Exception {
132                 try {
133                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
134
135                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
136                                 .getInstance().requestCommit(tx);
137                         subTrans.add(subTx);
138                     }
139                 } catch (Exception e) {
140                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
141                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
142                         subTx.rollback();
143                     }
144                     return false;
145                 }
146                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
147                 return true;
148             }
149
150         });
151
152         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
153
154             @Override
155             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
156                 if(requestCommitSuccess) {
157                     return AbstractDataTransaction.convertToLegacyCommitFuture(tx.delegate.submit());
158                 }
159                 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
160             }
161         });
162
163         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
164             @Override
165             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
166                 if(input.isSuccessful()) {
167                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
168                         subTx.finish();
169                     }
170                 } else {
171                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
172                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
173                         subTx.rollback();
174                     }
175                 }
176                 return input;
177             }
178         });
179     }
180
181     @Deprecated
182     private class ForwardedBackwardsCompatibleTransacion implements DataModificationTransaction {
183
184         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
185         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
186         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
187         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
188         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
189         private TransactionStatus status = TransactionStatus.NEW;
190
191         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
192         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
193
194         private final ReadWriteTransaction delegate;
195
196
197         @Override
198         public final TransactionStatus getStatus() {
199             return status;
200         }
201
202         protected ForwardedBackwardsCompatibleTransacion(final ReadWriteTransaction delegate) {
203             this.delegate = delegate;
204             LOG.debug("Tx {} allocated.",getIdentifier());
205         }
206
207         @Override
208         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
209             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
210
211             @SuppressWarnings({ "rawtypes", "unchecked" })
212             final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
213             if(previouslyRemoved) {
214                 delegate.put(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
215             } else {
216                 delegate.merge(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
217             }
218         }
219
220         @Override
221         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
222             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
223             DataObject originalObj = readConfigurationData(path);
224             if (originalObj != null) {
225                 original.put(path, originalObj);
226
227             } else {
228                 created.put(path, data);
229             }
230             updated.put(path, data);
231             @SuppressWarnings({"rawtypes","unchecked"})
232             final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
233             if(previouslyRemoved) {
234                 delegate.put(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
235             } else {
236                 delegate.merge(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
237             }
238         }
239
240         @Override
241         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
242             posponedRemovedOperational.add(path);
243         }
244
245         @Override
246         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
247             posponedRemovedConfiguration.add(path);
248         }
249
250         @Override
251         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
252             return Collections.emptyMap();
253         }
254
255         @Override
256         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
257             return created;
258         }
259
260         @Override
261         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
262             return Collections.emptyMap();
263         }
264
265         @Override
266         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
267             return updated;
268         }
269
270         @Override
271         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
272             return removed;
273         }
274
275         @Override
276         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
277             return Collections.emptySet();
278         }
279
280         @Override
281         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
282             return original;
283         }
284
285         @Override
286         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
287             return Collections.emptyMap();
288         }
289
290         @Override
291         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
292             try {
293                 return delegate.read(LogicalDatastoreType.OPERATIONAL, path).get().orNull();
294             } catch (InterruptedException | ExecutionException e) {
295                 LOG.error("Read of {} failed.", path,e);
296                 return null;
297             }
298         }
299
300         @Override
301         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
302             try {
303                 return delegate.read(LogicalDatastoreType.CONFIGURATION, path).get().orNull();
304             } catch (InterruptedException | ExecutionException e) {
305                 LOG.error("Read of {} failed.", path,e);
306                 return null;
307             }
308         }
309
310         private void changeStatus(final TransactionStatus status) {
311             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
312             this.status = status;
313
314             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
315                 try {
316                     listener.getInstance().onStatusUpdated(this, status);
317                 } catch (Exception e) {
318                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
319                 }
320             }
321         }
322
323         @Override
324         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
325
326             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
327                 delegate.delete(LogicalDatastoreType.CONFIGURATION, path);
328             }
329
330             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
331                 delegate.delete(LogicalDatastoreType.OPERATIONAL, path);
332             }
333
334             changeStatus(TransactionStatus.SUBMITED);
335
336             final ListenableFuture<RpcResult<TransactionStatus>> f = HydrogenDataBrokerAdapter.this.commit(this);
337
338             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
339                 @Override
340                 public void onSuccess(final RpcResult<TransactionStatus> result) {
341                     changeStatus(result.getResult());
342                 }
343
344                 @Override
345                 public void onFailure(final Throwable t) {
346                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
347                     changeStatus(TransactionStatus.FAILED);
348                 }
349             });
350
351             return f;
352         }
353
354         @Override
355         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
356             return listeners.register(listener);
357         }
358
359         @Override
360         public Object getIdentifier() {
361             // TODO Auto-generated method stub
362             return null;
363         }
364
365     }
366
367     private class CommitHandlerRegistrationImpl extends
368             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
369
370         private final InstanceIdentifier<? extends DataObject> path;
371
372         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
373                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
374             super(commitHandler);
375             this.path = path;
376         }
377
378         @Override
379         protected void removeRegistration() {
380             commitHandlers.remove(path, this);
381         }
382
383     }
384
385
386     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
387
388         private final DataChangeListener instance;
389         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
390         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
391
392         public LegacyListenerRegistration(final DataChangeListener listener,
393                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
394                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
395             this.instance = listener;
396             this.cfgReg = cfgReg;
397             this.operReg = operReg;
398         }
399
400         @Override
401         public DataChangeListener getInstance() {
402             return instance;
403         }
404
405         @Override
406         public void close() {
407             cfgReg.close();
408             operReg.close();
409         }
410
411     }
412
413     private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
414
415         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
416
417
418         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
419             this.delegate = listener;
420         }
421
422         @SuppressWarnings({ "unchecked", "rawtypes" })
423         @Override
424         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
425
426             DataChangeEvent legacyChange = HydrogenDataChangeEvent.createOperational(change);
427             delegate.onDataChanged(legacyChange);
428
429         }
430
431         @Override
432         public DataChangeListener getDelegate() {
433             return (DataChangeListener) delegate;
434         }
435
436     }
437
438     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
439         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
440
441         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
442             this.delegate = listener;
443         }
444
445         @SuppressWarnings({ "unchecked", "rawtypes" })
446         @Override
447         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
448
449             DataChangeEvent legacyChange = HydrogenDataChangeEvent.createConfiguration(change);
450
451             delegate.onDataChanged(legacyChange);
452
453         }
454
455         @Override
456         public DataChangeListener getDelegate() {
457             return (DataChangeListener) delegate;
458         }
459
460     }
461
462     @Override
463     public void close() throws Exception {
464         // TODO Auto-generated method stub
465     }
466 }