2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.binding.impl;
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
21 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
22 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
29 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
33 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
34 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
35 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
36 import org.opendaylight.controller.sal.core.api.model.SchemaService;
37 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
38 import org.opendaylight.yangtools.concepts.Delegator;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
42 import org.opendaylight.yangtools.yang.binding.DataObject;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.opendaylight.yangtools.yang.common.RpcResult;
45 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
46 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 import com.google.common.base.Function;
51 import com.google.common.util.concurrent.AsyncFunction;
52 import com.google.common.util.concurrent.FutureCallback;
53 import com.google.common.util.concurrent.Futures;
54 import com.google.common.util.concurrent.ListenableFuture;
55 import com.google.common.util.concurrent.ListeningExecutorService;
57 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
59 private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
61 private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
62 private final ListeningExecutorService executorService;
64 public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
65 final BindingIndependentMappingService mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
66 super(domDataBroker, mappingService,schemaService);
67 executorService = executor;
68 LOG.info("ForwardedBackwardsCompatibleBroker started.");
72 public DataModificationTransaction beginTransaction() {
73 return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
77 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
78 DataModificationTransaction tx = beginTransaction();
79 return tx.readConfigurationData(path);
83 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
84 DataModificationTransaction tx = beginTransaction();
85 return tx.readOperationalData(path);
89 public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
90 final InstanceIdentifier<? extends DataObject> path,
91 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
94 //transformingCommitHandler = new TransformingDataChangeListener
95 //fakeCommitHandler = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
97 CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
98 commitHandlers.put(path, reg);
104 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
105 final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
106 throw new UnsupportedOperationException("Not supported contract.");
110 public ListenerRegistration<DataChangeListener> registerDataChangeListener(
111 final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
114 org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
115 org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
117 ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
118 ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
120 return new LegacyListenerRegistration(listener,cfgReg,operReg);
124 public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
125 final InstanceIdentifier<? extends DataObject> path,
126 final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
127 throw new UnsupportedOperationException("Data reader contract is not supported.");
130 public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
132 final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
133 LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
134 ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
137 public Boolean call() throws Exception {
139 for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
141 DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
142 .getInstance().requestCommit(tx);
145 } catch (Exception e) {
146 LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
147 for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
152 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
158 ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
161 public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
162 if(requestCommitSuccess) {
163 return tx.getDelegate().commit();
165 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
169 return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
171 public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
172 if(input.isSuccessful()) {
173 for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
177 LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
178 for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
187 private class ForwardedBackwardsCompatibleTransacion extends
188 AbstractReadWriteTransaction implements DataModificationTransaction {
190 private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
191 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
192 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
193 private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
194 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
195 private TransactionStatus status = TransactionStatus.NEW;
197 private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
198 private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
202 public final TransactionStatus getStatus() {
206 protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
207 final BindingToNormalizedNodeCodec codec) {
208 super(delegate, codec);
209 LOG.debug("Tx {} allocated.",getIdentifier());
213 public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
214 boolean previouslyRemoved = posponedRemovedOperational.remove(path);
215 if(previouslyRemoved) {
216 doPutWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
218 doMergeWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
223 public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
224 boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
225 DataObject originalObj = readConfigurationData(path);
226 if (originalObj != null) {
227 original.put(path, originalObj);
230 created.put(path, data);
232 updated.put(path, data);
233 if(previouslyRemoved) {
234 doPutWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
236 doMergeWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
241 public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
242 posponedRemovedOperational.add(path);
246 public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
247 posponedRemovedConfiguration.add(path);
251 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
252 return Collections.emptyMap();
256 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
261 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
262 return Collections.emptyMap();
266 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
271 public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
276 public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
277 return Collections.emptySet();
281 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
286 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
287 return Collections.emptyMap();
291 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
293 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
294 } catch (InterruptedException | ExecutionException e) {
295 LOG.error("Read of {} failed.", path,e);
301 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
303 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
304 } catch (InterruptedException | ExecutionException e) {
305 LOG.error("Read of {} failed.", path,e);
310 private void changeStatus(final TransactionStatus status) {
311 LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
312 this.status = status;
314 for(ListenerRegistration<DataTransactionListener> listener : listeners) {
316 listener.getInstance().onStatusUpdated(this, status);
317 } catch (Exception e) {
318 LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
324 public ListenableFuture<RpcResult<TransactionStatus>> commit() {
326 for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
327 doDelete(LogicalDatastoreType.CONFIGURATION, path);
330 for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
331 doDelete(LogicalDatastoreType.OPERATIONAL, path);
334 changeStatus(TransactionStatus.SUBMITED);
336 final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
338 Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
340 public void onSuccess(final RpcResult<TransactionStatus> result) {
341 changeStatus(result.getResult());
345 public void onFailure(final Throwable t) {
346 LOG.error("Transaction {} failed to complete", getIdentifier(), t);
347 changeStatus(TransactionStatus.FAILED);
355 public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
356 return listeners.register(listener);
361 private class CommitHandlerRegistrationImpl extends
362 AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
364 private final InstanceIdentifier<? extends DataObject> path;
366 public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
367 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
368 super(commitHandler);
373 protected void removeRegistration() {
374 commitHandlers.remove(path, this);
380 private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
382 private final DataChangeListener instance;
383 private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
384 private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
386 public LegacyListenerRegistration(final DataChangeListener listener,
387 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
388 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
389 this.instance = listener;
390 this.cfgReg = cfgReg;
391 this.operReg = operReg;
395 public DataChangeListener getInstance() {
400 public void close() {
407 private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
409 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
412 public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
413 this.delegate = listener;
416 @SuppressWarnings({ "unchecked", "rawtypes" })
418 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
420 DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
421 delegate.onDataChanged(legacyChange);
426 public DataChangeListener getDelegate() {
427 return (DataChangeListener) delegate;
432 private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
433 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
435 public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
436 this.delegate = listener;
439 @SuppressWarnings({ "unchecked", "rawtypes" })
441 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
443 DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
445 delegate.onDataChanged(legacyChange);
450 public DataChangeListener getDelegate() {
451 return (DataChangeListener) delegate;