2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.binding.impl;
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ExecutionException;
21 import org.opendaylight.controller.md.sal.binding.api.BindingDataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
23 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
27 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
34 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
35 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
36 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
37 import org.opendaylight.controller.sal.common.util.Rpcs;
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
39 import org.opendaylight.yangtools.concepts.Delegator;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import com.google.common.base.Function;
52 import com.google.common.util.concurrent.AsyncFunction;
53 import com.google.common.util.concurrent.FutureCallback;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.ListeningExecutorService;
58 public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable {
60 private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class);
62 private final ConcurrentHashMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>();
63 private final ListenerRegistry<DataChangeListener> fakeRegistry = ListenerRegistry.create();
64 private final ListeningExecutorService executorService;
66 public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker,
67 final BindingIndependentMappingService mappingService, final ListeningExecutorService executor) {
68 super(domDataBroker, mappingService);
69 executorService = executor;
70 LOG.info("ForwardedBackwardsCompatibleBroker started.");
74 public DataModificationTransaction beginTransaction() {
75 return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec());
79 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
80 DataModificationTransaction tx = beginTransaction();
81 return tx.readConfigurationData(path);
85 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
86 DataModificationTransaction tx = beginTransaction();
87 return tx.readOperationalData(path);
91 public Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> registerCommitHandler(
92 final InstanceIdentifier<? extends DataObject> path,
93 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
96 //transformingCommitHandler = new TransformingDataChangeListener
97 //fakeCommitHandler = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE);
99 CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
100 commitHandlers.put(path, reg);
106 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
107 final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
108 throw new UnsupportedOperationException("Not supported contract.");
112 public ListenerRegistration<DataChangeListener> registerDataChangeListener(
113 final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
116 BindingDataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
117 BindingDataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
119 ListenerRegistration<BindingDataChangeListener> cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
120 ListenerRegistration<BindingDataChangeListener> operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
122 return new LegacyListenerRegistration(listener,cfgReg,operReg);
126 public Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> registerDataReader(
127 final InstanceIdentifier<? extends DataObject> path,
128 final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
129 throw new UnsupportedOperationException("Data reader contract is not supported.");
133 public void close() throws Exception {
134 // TODO Auto-generated method stub
138 public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
140 final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
141 LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
142 ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
145 public Boolean call() throws Exception {
147 for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
149 DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
150 .getInstance().requestCommit(tx);
153 } catch (Exception e) {
154 LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
155 for (DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
160 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
166 ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
169 public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
170 if(requestCommitSuccess) {
171 return tx.getDelegate().commit();
173 return Futures.immediateFuture(Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError>emptySet()));
177 return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
179 public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
180 if(input.isSuccessful()) {
181 for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
185 LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
186 for(DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
195 private class ForwardedBackwardsCompatibleTransacion extends
196 AbstractForwardedTransaction<DOMDataReadWriteTransaction> implements DataModificationTransaction {
198 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
199 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
200 private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
201 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
202 private TransactionStatus status = TransactionStatus.NEW;
205 public final TransactionStatus getStatus() {
209 protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate,
210 final BindingToNormalizedNodeCodec codec) {
211 super(delegate, codec);
212 LOG.debug("Tx {} allocated.",getIdentifier());
216 public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
218 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
222 public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
223 DataObject originalObj = readConfigurationData(path);
224 if (originalObj != null) {
225 original.put(path, originalObj);
228 created.put(path, data);
230 updated.put(path, data);
231 doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
235 public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
236 doDelete(getDelegate(), LogicalDatastoreType.OPERATIONAL, path);
241 public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
242 doDelete(getDelegate(), LogicalDatastoreType.CONFIGURATION, path);
246 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
247 return Collections.emptyMap();
251 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
256 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
257 return Collections.emptyMap();
261 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
266 public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
271 public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
272 return Collections.emptySet();
276 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
281 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
282 return Collections.emptyMap();
286 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
288 return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull();
289 } catch (InterruptedException | ExecutionException e) {
290 LOG.error("Read of {} failed.", path,e);
296 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
298 return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull();
299 } catch (InterruptedException | ExecutionException e) {
300 LOG.error("Read of {} failed.", path,e);
306 public Object getIdentifier() {
307 return getDelegate().getIdentifier();
310 private void changeStatus(TransactionStatus status) {
311 LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
312 this.status = status;
316 public ListenableFuture<RpcResult<TransactionStatus>> commit() {
317 final ListenableFuture<RpcResult<TransactionStatus>> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this);
319 changeStatus(TransactionStatus.SUBMITED);
321 Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
323 public void onSuccess(RpcResult<TransactionStatus> result) {
324 changeStatus(result.getResult());
328 public void onFailure(Throwable t) {
329 LOG.error("Transaction {} failed to complete", getIdentifier(), t);
330 changeStatus(TransactionStatus.FAILED);
338 public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
339 throw new UnsupportedOperationException();
344 private class CommitHandlerRegistrationImpl extends
345 AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
347 private final InstanceIdentifier<? extends DataObject> path;
349 public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
350 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
351 super(commitHandler);
356 protected void removeRegistration() {
357 commitHandlers.remove(path, this);
363 private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
365 private final DataChangeListener instance;
366 private final ListenerRegistration<BindingDataChangeListener> cfgReg;
367 private final ListenerRegistration<BindingDataChangeListener> operReg;
369 public LegacyListenerRegistration(final DataChangeListener listener,
370 final ListenerRegistration<BindingDataChangeListener> cfgReg,
371 final ListenerRegistration<BindingDataChangeListener> operReg) {
372 this.instance = listener;
373 this.cfgReg = cfgReg;
374 this.operReg = operReg;
378 public DataChangeListener getInstance() {
383 public void close() {
390 private static class BackwardsCompatibleOperationalDataChangeInvoker implements BindingDataChangeListener, Delegator<DataChangeListener> {
392 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
395 public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
396 this.delegate = listener;
399 @SuppressWarnings({ "unchecked", "rawtypes" })
401 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
403 DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change);
404 delegate.onDataChanged(legacyChange);
409 public DataChangeListener getDelegate() {
410 return (DataChangeListener) delegate;
415 private static class BackwardsCompatibleConfigurationDataChangeInvoker implements BindingDataChangeListener, Delegator<DataChangeListener> {
418 @SuppressWarnings("rawtypes")
419 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
421 public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
422 this.delegate = listener;
425 @SuppressWarnings({ "unchecked", "rawtypes" })
427 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
429 DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change);
431 delegate.onDataChanged(legacyChange);
436 public DataChangeListener getDelegate() {
437 return (DataChangeListener) delegate;