2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.binding.impl;
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.AsyncFunction;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
27 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
30 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
32 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
38 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
39 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
40 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
41 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
42 import org.opendaylight.controller.sal.core.api.model.SchemaService;
43 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
44 import org.opendaylight.yangtools.concepts.Delegator;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.concepts.Registration;
47 import org.opendaylight.yangtools.util.ListenerRegistry;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 @SuppressWarnings("deprecation")
56 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
58 private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
60 private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
61 private final ListeningExecutorService executorService;
63 public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
64 final BindingToNormalizedNodeCodec mappingService, final SchemaService schemaService,final ListeningExecutorService executor) {
65 super(domDataBroker, mappingService,schemaService);
66 executorService = executor;
67 LOG.info("ForwardedBackwardsCompatibleBroker started.");
71 public DataModificationTransaction beginTransaction() {
72 return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
76 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
77 DataModificationTransaction tx = beginTransaction();
78 return tx.readConfigurationData(path);
82 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
83 DataModificationTransaction tx = beginTransaction();
84 return tx.readOperationalData(path);
88 public Registration registerCommitHandler(
89 final InstanceIdentifier<? extends DataObject> path,
90 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
93 //transformingCommitHandler = new TransformingDataChangeListener
94 //fakeCommitHandler = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
96 CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
97 commitHandlers.put(path, reg);
103 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
104 final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
105 throw new UnsupportedOperationException("Not supported contract.");
109 public ListenerRegistration<DataChangeListener> registerDataChangeListener(
110 final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
113 org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
114 org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
116 ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
117 ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
119 return new LegacyListenerRegistration(listener,cfgReg,operReg);
123 public Registration registerDataReader(
124 final InstanceIdentifier<? extends DataObject> path,
125 final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
126 throw new UnsupportedOperationException("Data reader contract is not supported.");
129 public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
131 final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
132 LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
133 ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
136 public Boolean call() throws Exception {
138 for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
140 DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
141 .getInstance().requestCommit(tx);
144 } catch (Exception e) {
145 LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
146 for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
151 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
157 ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
160 public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
161 if(requestCommitSuccess) {
162 return AbstractDataTransaction.convertToLegacyCommitFuture(tx.getDelegate().submit());
164 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
168 return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
170 public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
171 if(input.isSuccessful()) {
172 for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
176 LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
177 for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
186 private class ForwardedBackwardsCompatibleTransacion extends
187 AbstractReadWriteTransaction implements DataModificationTransaction {
189 private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
190 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
191 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
192 private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
193 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
194 private TransactionStatus status = TransactionStatus.NEW;
196 private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
197 private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
201 public final TransactionStatus getStatus() {
205 protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
206 final BindingToNormalizedNodeCodec codec) {
207 super(delegate, codec);
208 LOG.debug("Tx {} allocated.",getIdentifier());
212 public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
213 boolean previouslyRemoved = posponedRemovedOperational.remove(path);
215 @SuppressWarnings({ "rawtypes", "unchecked" })
216 final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
217 if(previouslyRemoved) {
218 put(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
220 merge(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
225 public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
226 boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
227 DataObject originalObj = readConfigurationData(path);
228 if (originalObj != null) {
229 original.put(path, originalObj);
232 created.put(path, data);
234 updated.put(path, data);
235 @SuppressWarnings({"rawtypes","unchecked"})
236 final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
237 if(previouslyRemoved) {
238 put(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
240 merge(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
245 public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
246 posponedRemovedOperational.add(path);
250 public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
251 posponedRemovedConfiguration.add(path);
255 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
256 return Collections.emptyMap();
260 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
265 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
266 return Collections.emptyMap();
270 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
275 public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
280 public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
281 return Collections.emptySet();
285 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
290 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
291 return Collections.emptyMap();
295 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
297 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
298 } catch (InterruptedException | ExecutionException e) {
299 LOG.error("Read of {} failed.", path,e);
305 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
307 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
308 } catch (InterruptedException | ExecutionException e) {
309 LOG.error("Read of {} failed.", path,e);
314 private void changeStatus(final TransactionStatus status) {
315 LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
316 this.status = status;
318 for(ListenerRegistration<DataTransactionListener> listener : listeners) {
320 listener.getInstance().onStatusUpdated(this, status);
321 } catch (Exception e) {
322 LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
328 public ListenableFuture<RpcResult<TransactionStatus>> commit() {
330 for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
331 doDelete(LogicalDatastoreType.CONFIGURATION, path);
334 for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
335 doDelete(LogicalDatastoreType.OPERATIONAL, path);
338 changeStatus(TransactionStatus.SUBMITED);
340 final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
342 Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
344 public void onSuccess(final RpcResult<TransactionStatus> result) {
345 changeStatus(result.getResult());
349 public void onFailure(final Throwable t) {
350 LOG.error("Transaction {} failed to complete", getIdentifier(), t);
351 changeStatus(TransactionStatus.FAILED);
359 public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
360 return listeners.register(listener);
365 private class CommitHandlerRegistrationImpl extends
366 AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
368 private final InstanceIdentifier<? extends DataObject> path;
370 public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
371 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
372 super(commitHandler);
377 protected void removeRegistration() {
378 commitHandlers.remove(path, this);
384 private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
386 private final DataChangeListener instance;
387 private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
388 private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
390 public LegacyListenerRegistration(final DataChangeListener listener,
391 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
392 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
393 this.instance = listener;
394 this.cfgReg = cfgReg;
395 this.operReg = operReg;
399 public DataChangeListener getInstance() {
404 public void close() {
411 private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
413 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
416 public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
417 this.delegate = listener;
420 @SuppressWarnings({ "unchecked", "rawtypes" })
422 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
424 DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
425 delegate.onDataChanged(legacyChange);
430 public DataChangeListener getDelegate() {
431 return (DataChangeListener) delegate;
436 private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
437 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
439 public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
440 this.delegate = listener;
443 @SuppressWarnings({ "unchecked", "rawtypes" })
445 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
447 DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
449 delegate.onDataChanged(legacyChange);
454 public DataChangeListener getDelegate() {
455 return (DataChangeListener) delegate;