2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.binding.compat;
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.AsyncFunction;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
30 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
31 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
32 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
33 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
40 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
41 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
42 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
43 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
44 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
45 import org.opendaylight.yangtools.concepts.Delegator;
46 import org.opendaylight.yangtools.concepts.ListenerRegistration;
47 import org.opendaylight.yangtools.concepts.Registration;
48 import org.opendaylight.yangtools.util.ListenerRegistry;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 public class HydrogenDataBrokerAdapter implements DataProviderService, AutoCloseable {
59 private static final Logger LOG = LoggerFactory.getLogger(HydrogenDataBrokerAdapter.class);
61 private final ConcurrentMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers =
62 new ConcurrentHashMap<>();
63 private final ListeningExecutorService executorService = SingletonHolder.getDefaultCommitExecutor();
65 private final DataBroker delegate;
67 public HydrogenDataBrokerAdapter(final DataBroker dataBroker) {
68 delegate = dataBroker;
69 LOG.info("ForwardedBackwardsCompatibleBroker started.");
73 public DataModificationTransaction beginTransaction() {
74 return new ForwardedBackwardsCompatibleTransacion(delegate.newReadWriteTransaction());
78 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
79 final DataModificationTransaction tx = beginTransaction();
80 return tx.readConfigurationData(path);
84 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
85 final DataModificationTransaction tx = beginTransaction();
86 return tx.readOperationalData(path);
90 public Registration registerCommitHandler(
91 final InstanceIdentifier<? extends DataObject> path,
92 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
93 final CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
94 commitHandlers.put(path, reg);
100 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
101 final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
102 throw new UnsupportedOperationException("Not supported contract.");
106 public ListenerRegistration<DataChangeListener> registerDataChangeListener(
107 final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
110 final org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener);
111 final org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener);
113 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg = delegate.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE);
114 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg = delegate.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE);
116 return new LegacyListenerRegistration(listener,cfgReg,operReg);
120 public Registration registerDataReader(
121 final InstanceIdentifier<? extends DataObject> path,
122 final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
123 throw new UnsupportedOperationException("Data reader contract is not supported.");
126 public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
128 final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
129 LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
130 final ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
133 public Boolean call() throws Exception {
135 for (final CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
137 final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
138 .getInstance().requestCommit(tx);
141 } catch (final Exception e) {
142 LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
143 for (final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
148 LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
154 final ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
157 public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
158 if(requestCommitSuccess) {
159 return AbstractDataTransaction.convertToLegacyCommitFuture(tx.delegate.submit());
161 return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
165 return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
167 public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
168 if(input.isSuccessful()) {
169 for(final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
173 LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
174 for(final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
184 private class ForwardedBackwardsCompatibleTransacion implements DataModificationTransaction {
186 private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
187 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
188 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> created = new HashMap<>();
189 private final Set<InstanceIdentifier<? extends DataObject>> removed = new HashSet<>();
190 private final Map<InstanceIdentifier<? extends DataObject>, DataObject> original = new HashMap<>();
191 private TransactionStatus status = TransactionStatus.NEW;
193 private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedOperational = new HashSet<>();
194 private final Set<InstanceIdentifier<? extends DataObject>> posponedRemovedConfiguration = new HashSet<>();
196 private final ReadWriteTransaction delegate;
200 public final TransactionStatus getStatus() {
204 protected ForwardedBackwardsCompatibleTransacion(final ReadWriteTransaction delegate) {
205 this.delegate = delegate;
206 LOG.debug("Tx {} allocated.",getIdentifier());
210 public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
211 final boolean previouslyRemoved = posponedRemovedOperational.remove(path);
213 @SuppressWarnings({ "rawtypes", "unchecked" })
214 final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
215 if(previouslyRemoved) {
216 delegate.put(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
218 delegate.merge(LogicalDatastoreType.OPERATIONAL, castedPath, data,true);
223 public void putConfigurationData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
224 final boolean previouslyRemoved = posponedRemovedConfiguration.remove(path);
225 final DataObject originalObj = readConfigurationData(path);
226 if (originalObj != null) {
227 original.put(path, originalObj);
230 created.put(path, data);
232 updated.put(path, data);
233 @SuppressWarnings({"rawtypes","unchecked"})
234 final InstanceIdentifier<DataObject> castedPath = (InstanceIdentifier) path;
235 if(previouslyRemoved) {
236 delegate.put(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
238 delegate.merge(LogicalDatastoreType.CONFIGURATION, castedPath, data,true);
243 public void removeOperationalData(final InstanceIdentifier<? extends DataObject> path) {
244 posponedRemovedOperational.add(path);
248 public void removeConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
249 posponedRemovedConfiguration.add(path);
253 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
254 return Collections.emptyMap();
258 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
263 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
264 return Collections.emptyMap();
268 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
273 public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
278 public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
279 return Collections.emptySet();
283 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
288 public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
289 return Collections.emptyMap();
293 public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
295 return delegate.read(LogicalDatastoreType.OPERATIONAL, path).get().orNull();
296 } catch (InterruptedException | ExecutionException e) {
297 LOG.error("Read of {} failed.", path,e);
303 public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
305 return delegate.read(LogicalDatastoreType.CONFIGURATION, path).get().orNull();
306 } catch (InterruptedException | ExecutionException e) {
307 LOG.error("Read of {} failed.", path,e);
312 private void changeStatus(final TransactionStatus status) {
313 LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
314 this.status = status;
316 for(final ListenerRegistration<DataTransactionListener> listener : listeners) {
318 listener.getInstance().onStatusUpdated(this, status);
319 } catch (final Exception e) {
320 LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e);
326 public ListenableFuture<RpcResult<TransactionStatus>> commit() {
328 for(final InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
329 delegate.delete(LogicalDatastoreType.CONFIGURATION, path);
332 for(final InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
333 delegate.delete(LogicalDatastoreType.OPERATIONAL, path);
336 changeStatus(TransactionStatus.SUBMITED);
338 final ListenableFuture<RpcResult<TransactionStatus>> f = HydrogenDataBrokerAdapter.this.commit(this);
340 Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
342 public void onSuccess(final RpcResult<TransactionStatus> result) {
343 changeStatus(result.getResult());
347 public void onFailure(final Throwable t) {
348 LOG.error("Transaction {} failed to complete", getIdentifier(), t);
349 changeStatus(TransactionStatus.FAILED);
357 public ListenerRegistration<DataTransactionListener> registerListener(final DataTransactionListener listener) {
358 return listeners.register(listener);
362 public Object getIdentifier() {
363 // TODO Auto-generated method stub
369 private class CommitHandlerRegistrationImpl extends
370 AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
372 private final InstanceIdentifier<? extends DataObject> path;
374 public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
375 final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
376 super(commitHandler);
381 protected void removeRegistration() {
382 commitHandlers.remove(path, this);
388 private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
390 private final DataChangeListener instance;
391 private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg;
392 private final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg;
394 public LegacyListenerRegistration(final DataChangeListener listener,
395 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> cfgReg,
396 final ListenerRegistration<org.opendaylight.controller.md.sal.binding.api.DataChangeListener> operReg) {
397 this.instance = listener;
398 this.cfgReg = cfgReg;
399 this.operReg = operReg;
403 public DataChangeListener getInstance() {
408 public void close() {
415 private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
417 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
420 public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) {
421 this.delegate = listener;
424 @SuppressWarnings({ "unchecked", "rawtypes" })
426 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
428 final DataChangeEvent legacyChange = HydrogenDataChangeEvent.createOperational(change);
429 delegate.onDataChanged(legacyChange);
434 public DataChangeListener getDelegate() {
435 return (DataChangeListener) delegate;
440 private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator<DataChangeListener> {
441 private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener<?,?> delegate;
443 public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) {
444 this.delegate = listener;
447 @SuppressWarnings({ "unchecked", "rawtypes" })
449 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
451 final DataChangeEvent legacyChange = HydrogenDataChangeEvent.createConfiguration(change);
453 delegate.onDataChanged(legacyChange);
458 public DataChangeListener getDelegate() {
459 return (DataChangeListener) delegate;
465 public void close() throws Exception {
466 // TODO Auto-generated method stub