Implement finding a primary based on the shard name and do basic wiring of Distribute...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / impl / ForwardedBackwardsCompatibleDataBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.impl;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
20
21 import org.opendaylight.controller.md.sal.binding.api.BindingDataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
23 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
34 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
35 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
36 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
37 import org.opendaylight.controller.sal.common.util.Rpcs;
38 import org.opendaylight.controller.sal.core.api.model.SchemaService;
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
40 import org.opendaylight.yangtools.concepts.Delegator;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.base.Function;
53 import com.google.common.util.concurrent.AsyncFunction;
54 import com.google.common.util.concurrent.FutureCallback;
55 import com.google.common.util.concurrent.Futures;
56 import com.google.common.util.concurrent.ListenableFuture;
57 import com.google.common.util.concurrent.ListeningExecutorService;
58
59 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
60
61     private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
62
63     private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
64     private final ListeningExecutorService executorService;
65
66     public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
67             final BindingIndependentMappingService mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
68         super(domDataBroker, mappingService,schemaService);
69         executorService = executor;
70         LOG.info("ForwardedBackwardsCompatibleBroker started.");
71     }
72
73     @Override
74     public DataModificationTransaction beginTransaction() {
75         return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
76     }
77
78     @Override
79     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
80         DataModificationTransaction tx = beginTransaction();
81         return tx.readConfigurationData(path);
82     }
83
84     @Override
85     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
86         DataModificationTransaction tx = beginTransaction();
87         return tx.readOperationalData(path);
88     }
89
90     @Override
91     public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
92             final InstanceIdentifier<? extends DataObject> path,
93             final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
94
95
96         //transformingCommitHandler = new TransformingDataChangeListener
97         //fakeCommitHandler =  registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
98
99         CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
100         commitHandlers.put(path, reg);
101         return reg;
102     }
103
104     @Override
105     @Deprecated
106     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
107             final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
108         throw new UnsupportedOperationException("Not supported contract.");
109     }
110
111     @Override
112     public ListenerRegistration<DataChangeListener> registerDataChangeListener(
113             final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
114
115
116         BindingDataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
117         BindingDataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
118
119         ListenerRegistration<BindingDataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
120         ListenerRegistration<BindingDataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
121
122         return new LegacyListenerRegistration(listener,cfgReg,operReg);
123     }
124
125     @Override
126     public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
127             final InstanceIdentifier<? extends DataObject> path,
128             final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
129         throw new UnsupportedOperationException("Data reader contract is not supported.");
130     }
131
132     public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
133
134         final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
135         LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
136         ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
137
138             @Override
139             public Boolean call() throws Exception {
140                 try {
141                     for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
142
143                         DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
144                                 .getInstance().requestCommit(tx);
145                         subTrans.add(subTx);
146                     }
147                 } catch (Exception e) {
148                     LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
149                     for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
150                         subTx.rollback();
151                     }
152                     return false;
153                 }
154                 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
155                 return true;
156             }
157
158         });
159
160         ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
161
162             @Override
163             public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
164                 if(requestCommitSuccess) {
165                     return tx.getDelegate().commit();
166                 }
167                 return Futures.immediateFuture(Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError>emptySet()));
168             }
169         });
170
171         return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
172             @Override
173             public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
174                 if(input.isSuccessful()) {
175                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
176                         subTx.finish();
177                     }
178                 } else {
179                     LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
180                     for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
181                         subTx.rollback();
182                     }
183                 }
184                 return input;
185             }
186         });
187     }
188
189     private class ForwardedBackwardsCompatibleTransacion extends
190             AbstractForwardedTransaction<DOMDataReadWriteTransaction> implements DataModificationTransaction {
191
192         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
193         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
194         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
195         private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
196         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
197         private TransactionStatus status = TransactionStatus.NEW;
198
199         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
200         private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
201
202
203         @Override
204         public final TransactionStatus getStatus() {
205             return status;
206         }
207
208         protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
209                 final BindingToNormalizedNodeCodec codec) {
210             super(delegate, codec);
211             LOG.debug("Tx {} allocated.",getIdentifier());
212         }
213
214         @Override
215         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
216             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
217             if(previouslyRemoved) {
218                 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
219             } else {
220                 doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
221             }
222         }
223
224         @Override
225         public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
226             boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
227             DataObject originalObj = readConfigurationData(path);
228             if (originalObj != null) {
229                 original.put(path, originalObj);
230
231             } else {
232                 created.put(path, data);
233             }
234             updated.put(path, data);
235             if(previouslyRemoved) {
236                 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
237             } else {
238                 doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
239             }
240         }
241
242         @Override
243         public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
244             posponedRemovedOperational.add(path);
245         }
246
247         @Override
248         public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
249             posponedRemovedConfiguration.add(path);
250         }
251
252         @Override
253         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
254             return Collections.emptyMap();
255         }
256
257         @Override
258         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
259             return created;
260         }
261
262         @Override
263         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
264             return Collections.emptyMap();
265         }
266
267         @Override
268         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
269             return updated;
270         }
271
272         @Override
273         public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
274             return removed;
275         }
276
277         @Override
278         public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
279             return Collections.emptySet();
280         }
281
282         @Override
283         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
284             return original;
285         }
286
287         @Override
288         public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
289             return Collections.emptyMap();
290         }
291
292         @Override
293         public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
294             try {
295                 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
296             } catch (InterruptedException | ExecutionException e) {
297                 LOG.error("Read of {} failed.", path,e);
298                 return null;
299             }
300         }
301
302         @Override
303         public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
304             try {
305                 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
306             } catch (InterruptedException | ExecutionException e) {
307                 LOG.error("Read of {} failed.", path,e);
308                 return null;
309             }
310         }
311
312         @Override
313         public Object getIdentifier() {
314             return getDelegate().getIdentifier();
315         }
316
317         private void changeStatus(final TransactionStatus status) {
318             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
319             this.status = status;
320
321             for(ListenerRegistration<DataTransactionListener> listener : listeners) {
322                 try {
323                     listener.getInstance().onStatusUpdated(this, status);
324                 } catch (Exception e) {
325                     LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
326                 }
327             }
328         }
329
330         @Override
331         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
332
333             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
334                 doDelete(getDelegate(), LogicalDatastoreType.CONFIGURATION, path);
335             }
336
337             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
338                 doDelete(getDelegate(), LogicalDatastoreType.OPERATIONAL, path);
339             }
340
341             changeStatus(TransactionStatus.SUBMITED);
342
343             final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
344
345             Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
346                 @Override
347                 public void onSuccess(final RpcResult<TransactionStatus> result) {
348                     changeStatus(result.getResult());
349                 }
350
351                 @Override
352                 public void onFailure(final Throwable t) {
353                     LOG.error("Transaction {} failed to complete", getIdentifier(), t);
354                     changeStatus(TransactionStatus.FAILED);
355                 }
356             });
357
358             return f;
359         }
360
361         @Override
362         public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
363             return listeners.register(listener);
364         }
365
366     }
367
368     private class CommitHandlerRegistrationImpl extends
369             AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
370
371         private final InstanceIdentifier<? extends DataObject> path;
372
373         public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
374                 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
375             super(commitHandler);
376             this.path = path;
377         }
378
379         @Override
380         protected void removeRegistration() {
381             commitHandlers.remove(path, this);
382         }
383
384     }
385
386
387     private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
388
389         private final DataChangeListener instance;
390         private final ListenerRegistration<BindingDataChangeListener> cfgReg;
391         private final ListenerRegistration<BindingDataChangeListener> operReg;
392
393         public LegacyListenerRegistration(final DataChangeListener listener,
394                 final ListenerRegistration<BindingDataChangeListener> cfgReg,
395                 final ListenerRegistration<BindingDataChangeListener> operReg) {
396             this.instance = listener;
397             this.cfgReg = cfgReg;
398             this.operReg = operReg;
399         }
400
401         @Override
402         public DataChangeListener getInstance() {
403             return instance;
404         }
405
406         @Override
407         public void close() {
408             cfgReg.close();
409             operReg.close();
410         }
411
412     }
413
414     private static class BackwardsCompatibleOperationalDataChangeInvoker implements BindingDataChangeListener, Delegator<DataChangeListener> {
415
416         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
417
418
419         public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
420             this.delegate = listener;
421         }
422
423         @SuppressWarnings({ "unchecked", "rawtypes" })
424         @Override
425         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
426
427             DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
428             delegate.onDataChanged(legacyChange);
429
430         }
431
432         @Override
433         public DataChangeListener getDelegate() {
434             return (DataChangeListener) delegate;
435         }
436
437     }
438
439     private static class BackwardsCompatibleConfigurationDataChangeInvoker implements BindingDataChangeListener, Delegator<DataChangeListener> {
440         private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
441
442         public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
443             this.delegate = listener;
444         }
445
446         @SuppressWarnings({ "unchecked", "rawtypes" })
447         @Override
448         public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
449
450             DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
451
452             delegate.onDataChanged(legacyChange);
453
454         }
455
456         @Override
457         public DataChangeListener getDelegate() {
458             return (DataChangeListener) delegate;
459         }
460
461     }
462 }