1c6447a4e741615b714657c9356dc6b756fe44c0
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / impl / ForwardedBackwardsCompatibleDataBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.impl;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
20
21 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
22 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
29 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
33 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
34 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
35 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
36 import org.opendaylight.controller.sal.common.util.Rpcs;
37 import org.opendaylight.controller.sal.core.api.model.SchemaService;
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
39 import org.opendaylight.yangtools.concepts.Delegator;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.base.Function;
52 import com.google.common.util.concurrent.AsyncFunction;
53 import com.google.common.util.concurrent.FutureCallback;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.ListeningExecutorService;
57
58 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
59
60     private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
61
62     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
63     private final ListeningExecutorService executorService;
64
65     public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
66             final BindingIndependentMappingService mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
67         super(domDataBroker, mappingService,schemaService);
68         executorService = executor;
69         LOG.info("ForwardedBackwardsCompatibleBroker started.");
70     }
71
72     @Override
73     public DataModificationTransaction beginTransaction() {
74         return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
75     }
76
77     @Override
78     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
79         DataModificationTransaction tx = beginTransaction();
80         return tx.readConfigurationData(path);
81     }
82
83     @Override
84     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
85         DataModificationTransaction tx = beginTransaction();
86         return tx.readOperationalData(path);
87     }
88
89     @Override
90     public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
91             final InstanceIdentifier<? extends DataObject> path,
92             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
93
94
95         //transformingCommitHandler = new TransformingDataChangeListener
96         //fakeCommitHandler =  registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
97
98         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
99         commitHandlers.put(path, reg);
100         return reg;
101     }
102
103     @Override
104     @Deprecated
105     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
106             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
107         throw new UnsupportedOperationException("Not supported contract.");
108     }
109
110     @Override
111     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
112             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
113
114
115         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
116         org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
117
118         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
119         ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
120
121         return new LegacyListenerRegistration(listener,cfgReg,operReg);
122     }
123
124     @Override
125     public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
126             final InstanceIdentifier<? extends DataObject> path,
127             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
128         throw new UnsupportedOperationException("Data reader contract is not supported.");
129     }
130
131     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
132
133         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
134         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
135         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
136
137             @Override
138             public Boolean call() throws Exception {
139                 try {
140                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
141
142                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
143                                 .getInstance().requestCommit(tx);
144                         subTrans.add(subTx);
145                     }
146                 } catch (Exception e) {
147                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
148                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
149                         subTx.rollback();
150                     }
151                     return false;
152                 }
153                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
154                 return true;
155             }
156
157         });
158
159         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
160
161             @Override
162             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
163                 if(requestCommitSuccess) {
164                     return tx.getDelegate().commit();
165                 }
166                 return Futures.immediateFuture(Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError>emptySet()));
167             }
168         });
169
170         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
171             @Override
172             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
173                 if(input.isSuccessful()) {
174                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
175                         subTx.finish();
176                     }
177                 } else {
178                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
179                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
180                         subTx.rollback();
181                     }
182                 }
183                 return input;
184             }
185         });
186     }
187
188     private class ForwardedBackwardsCompatibleTransacion extends
189             AbstractReadWriteTransaction implements DataModificationTransaction {
190
191         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
192         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
193         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
194         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
195         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
196         private TransactionStatus status = TransactionStatus.NEW;
197
198         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
199         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
200
201
202         @Override
203         public final TransactionStatus getStatus() {
204             return status;
205         }
206
207         protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
208                 final BindingToNormalizedNodeCodec codec) {
209             super(delegate, codec);
210             LOG.debug("Tx {} allocated.",getIdentifier());
211         }
212
213         @Override
214         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
215             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
216             if(previouslyRemoved) {
217                 doPutWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
218             } else {
219                 doMergeWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
220             }
221         }
222
223         @Override
224         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
225             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
226             DataObject originalObj = readConfigurationData(path);
227             if (originalObj != null) {
228                 original.put(path, originalObj);
229
230             } else {
231                 created.put(path, data);
232             }
233             updated.put(path, data);
234             if(previouslyRemoved) {
235                 doPutWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
236             } else {
237                 doMergeWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
238             }
239         }
240
241         @Override
242         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
243             posponedRemovedOperational.add(path);
244         }
245
246         @Override
247         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
248             posponedRemovedConfiguration.add(path);
249         }
250
251         @Override
252         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
253             return Collections.emptyMap();
254         }
255
256         @Override
257         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
258             return created;
259         }
260
261         @Override
262         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
263             return Collections.emptyMap();
264         }
265
266         @Override
267         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
268             return updated;
269         }
270
271         @Override
272         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
273             return removed;
274         }
275
276         @Override
277         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
278             return Collections.emptySet();
279         }
280
281         @Override
282         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
283             return original;
284         }
285
286         @Override
287         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
288             return Collections.emptyMap();
289         }
290
291         @Override
292         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
293             try {
294                 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
295             } catch (InterruptedException | ExecutionException e) {
296                 LOG.error("Read of {} failed.", path,e);
297                 return null;
298             }
299         }
300
301         @Override
302         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
303             try {
304                 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
305             } catch (InterruptedException | ExecutionException e) {
306                 LOG.error("Read of {} failed.", path,e);
307                 return null;
308             }
309         }
310
311         private void changeStatus(final TransactionStatus status) {
312             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
313             this.status = status;
314
315             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
316                 try {
317                     listener.getInstance().onStatusUpdated(this, status);
318                 } catch (Exception e) {
319                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
320                 }
321             }
322         }
323
324         @Override
325         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
326
327             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
328                 doDelete(LogicalDatastoreType.CONFIGURATION, path);
329             }
330
331             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
332                 doDelete(LogicalDatastoreType.OPERATIONAL, path);
333             }
334
335             changeStatus(TransactionStatus.SUBMITED);
336
337             final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
338
339             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
340                 @Override
341                 public void onSuccess(final RpcResult<TransactionStatus> result) {
342                     changeStatus(result.getResult());
343                 }
344
345                 @Override
346                 public void onFailure(final Throwable t) {
347                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
348                     changeStatus(TransactionStatus.FAILED);
349                 }
350             });
351
352             return f;
353         }
354
355         @Override
356         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
357             return listeners.register(listener);
358         }
359
360     }
361
362     private class CommitHandlerRegistrationImpl extends
363             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
364
365         private final InstanceIdentifier<? extends DataObject> path;
366
367         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
368                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
369             super(commitHandler);
370             this.path = path;
371         }
372
373         @Override
374         protected void removeRegistration() {
375             commitHandlers.remove(path, this);
376         }
377
378     }
379
380
381     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
382
383         private final DataChangeListener instance;
384         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
385         private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
386
387         public LegacyListenerRegistration(final DataChangeListener listener,
388                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
389                 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
390             this.instance = listener;
391             this.cfgReg = cfgReg;
392             this.operReg = operReg;
393         }
394
395         @Override
396         public DataChangeListener getInstance() {
397             return instance;
398         }
399
400         @Override
401         public void close() {
402             cfgReg.close();
403             operReg.close();
404         }
405
406     }
407
408     private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
409
410         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
411
412
413         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
414             this.delegate = listener;
415         }
416
417         @SuppressWarnings({ "unchecked", "rawtypes" })
418         @Override
419         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
420
421             DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
422             delegate.onDataChanged(legacyChange);
423
424         }
425
426         @Override
427         public DataChangeListener getDelegate() {
428             return (DataChangeListener) delegate;
429         }
430
431     }
432
433     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
434         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
435
436         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
437             this.delegate = listener;
438         }
439
440         @SuppressWarnings({ "unchecked", "rawtypes" })
441         @Override
442         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
443
444             DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
445
446             delegate.onDataChanged(legacyChange);
447
448         }
449
450         @Override
451         public DataChangeListener getDelegate() {
452             return (DataChangeListener) delegate;
453         }
454
455     }
456 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.