/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.sal.binding.impl; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import org.opendaylight.controller.md.sal.common.api.RegistrationListener; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.Delegator; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @SuppressWarnings("deprecation") public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDataBroker implements DataProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ForwardedBackwardsCompatibleDataBroker.class); private final ConcurrentHashMap, CommitHandlerRegistrationImpl> commitHandlers = new ConcurrentHashMap<>(); private final ListeningExecutorService executorService; public ForwardedBackwardsCompatibleDataBroker(final DOMDataBroker domDataBroker, final BindingIndependentMappingService mappingService, final SchemaService schemaService,final ListeningExecutorService executor) { super(domDataBroker, mappingService,schemaService); executorService = executor; LOG.info("ForwardedBackwardsCompatibleBroker started."); } @Override public DataModificationTransaction beginTransaction() { return new ForwardedBackwardsCompatibleTransacion(getDelegate().newReadWriteTransaction(), getCodec()); } @Override public DataObject readConfigurationData(final InstanceIdentifier path) { DataModificationTransaction tx = beginTransaction(); return tx.readConfigurationData(path); } @Override public DataObject readOperationalData(final InstanceIdentifier path) { DataModificationTransaction tx = beginTransaction(); return tx.readOperationalData(path); } @Override public Registration, DataObject>> registerCommitHandler( final InstanceIdentifier path, final DataCommitHandler, DataObject> commitHandler) { //transformingCommitHandler = new TransformingDataChangeListener //fakeCommitHandler = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, listener, DataChangeScope.SUBTREE); CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler); commitHandlers.put(path, reg); return reg; } @Override @Deprecated public ListenerRegistration, DataObject>>> registerCommitHandlerListener( final RegistrationListener, DataObject>> commitHandlerListener) { throw new UnsupportedOperationException("Not supported contract."); } @Override public ListenerRegistration registerDataChangeListener( final InstanceIdentifier path, final DataChangeListener listener) { org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncOperListener = new BackwardsCompatibleOperationalDataChangeInvoker(listener); org.opendaylight.controller.md.sal.binding.api.DataChangeListener asyncCfgListener = new BackwardsCompatibleConfigurationDataChangeInvoker(listener); ListenerRegistration cfgReg = registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, asyncCfgListener, DataChangeScope.SUBTREE); ListenerRegistration operReg = registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, asyncOperListener, DataChangeScope.SUBTREE); return new LegacyListenerRegistration(listener,cfgReg,operReg); } @Override public Registration, DataObject>> registerDataReader( final InstanceIdentifier path, final DataReader, DataObject> reader) { throw new UnsupportedOperationException("Data reader contract is not supported."); } public ListenableFuture> commit(final ForwardedBackwardsCompatibleTransacion tx) { final List, DataObject>> subTrans = new ArrayList<>(); LOG.debug("Tx: {} Submitted.",tx.getIdentifier()); ListenableFuture requestCommit = executorService.submit(new Callable() { @Override public Boolean call() throws Exception { try { for (CommitHandlerRegistrationImpl handler : commitHandlers.values()) { DataCommitTransaction, DataObject> subTx = handler .getInstance().requestCommit(tx); subTrans.add(subTx); } } catch (Exception e) { LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e); for (DataCommitTransaction, DataObject> subTx : subTrans) { subTx.rollback(); } return false; } LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier()); return true; } }); ListenableFuture> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction>() { @Override public ListenableFuture> apply(final Boolean requestCommitSuccess) throws Exception { if(requestCommitSuccess) { return AbstractDataTransaction.convertToLegacyCommitFuture(tx.getDelegate().submit()); } return Futures.immediateFuture(RpcResultBuilder.failed().withResult(TransactionStatus.FAILED).build()); } }); return Futures.transform(dataStoreCommit, new Function,RpcResult>() { @Override public RpcResult apply(final RpcResult input) { if(input.isSuccessful()) { for(DataCommitTransaction, DataObject> subTx : subTrans ) { subTx.finish(); } } else { LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier()); for(DataCommitTransaction, DataObject> subTx : subTrans ) { subTx.rollback(); } } return input; } }); } private class ForwardedBackwardsCompatibleTransacion extends AbstractReadWriteTransaction implements DataModificationTransaction { private final ListenerRegistry listeners = ListenerRegistry.create(); private final Map, DataObject> updated = new HashMap<>(); private final Map, DataObject> created = new HashMap<>(); private final Set> removed = new HashSet<>(); private final Map, DataObject> original = new HashMap<>(); private TransactionStatus status = TransactionStatus.NEW; private final Set> posponedRemovedOperational = new HashSet<>(); private final Set> posponedRemovedConfiguration = new HashSet<>(); @Override public final TransactionStatus getStatus() { return status; } protected ForwardedBackwardsCompatibleTransacion(final DOMDataReadWriteTransaction delegate, final BindingToNormalizedNodeCodec codec) { super(delegate, codec); LOG.debug("Tx {} allocated.",getIdentifier()); } @Override public void putOperationalData(final InstanceIdentifier path, final DataObject data) { boolean previouslyRemoved = posponedRemovedOperational.remove(path); @SuppressWarnings({ "rawtypes", "unchecked" }) final InstanceIdentifier castedPath = (InstanceIdentifier) path; if(previouslyRemoved) { put(LogicalDatastoreType.OPERATIONAL, castedPath, data,true); } else { merge(LogicalDatastoreType.OPERATIONAL, castedPath, data,true); } } @Override public void putConfigurationData(final InstanceIdentifier path, final DataObject data) { boolean previouslyRemoved = posponedRemovedConfiguration.remove(path); DataObject originalObj = readConfigurationData(path); if (originalObj != null) { original.put(path, originalObj); } else { created.put(path, data); } updated.put(path, data); @SuppressWarnings({"rawtypes","unchecked"}) final InstanceIdentifier castedPath = (InstanceIdentifier) path; if(previouslyRemoved) { put(LogicalDatastoreType.CONFIGURATION, castedPath, data,true); } else { merge(LogicalDatastoreType.CONFIGURATION, castedPath, data,true); } } @Override public void removeOperationalData(final InstanceIdentifier path) { posponedRemovedOperational.add(path); } @Override public void removeConfigurationData(final InstanceIdentifier path) { posponedRemovedConfiguration.add(path); } @Override public Map, DataObject> getCreatedOperationalData() { return Collections.emptyMap(); } @Override public Map, DataObject> getCreatedConfigurationData() { return created; } @Override public Map, DataObject> getUpdatedOperationalData() { return Collections.emptyMap(); } @Override public Map, DataObject> getUpdatedConfigurationData() { return updated; } @Override public Set> getRemovedConfigurationData() { return removed; } @Override public Set> getRemovedOperationalData() { return Collections.emptySet(); } @Override public Map, DataObject> getOriginalConfigurationData() { return original; } @Override public Map, DataObject> getOriginalOperationalData() { return Collections.emptyMap(); } @Override public DataObject readOperationalData(final InstanceIdentifier path) { try { return doRead(getDelegate(), LogicalDatastoreType.OPERATIONAL, path).get().orNull(); } catch (InterruptedException | ExecutionException e) { LOG.error("Read of {} failed.", path,e); return null; } } @Override public DataObject readConfigurationData(final InstanceIdentifier path) { try { return doRead(getDelegate(), LogicalDatastoreType.CONFIGURATION, path).get().orNull(); } catch (InterruptedException | ExecutionException e) { LOG.error("Read of {} failed.", path,e); return null; } } private void changeStatus(final TransactionStatus status) { LOG.trace("Transaction {} changed status to {}", getIdentifier(), status); this.status = status; for(ListenerRegistration listener : listeners) { try { listener.getInstance().onStatusUpdated(this, status); } catch (Exception e) { LOG.error("Error during invoking transaction listener {}",listener.getInstance(),e); } } } @Override public ListenableFuture> commit() { for(InstanceIdentifier path : posponedRemovedConfiguration) { doDelete(LogicalDatastoreType.CONFIGURATION, path); } for(InstanceIdentifier path : posponedRemovedOperational) { doDelete(LogicalDatastoreType.OPERATIONAL, path); } changeStatus(TransactionStatus.SUBMITED); final ListenableFuture> f = ForwardedBackwardsCompatibleDataBroker.this.commit(this); Futures.addCallback(f, new FutureCallback>() { @Override public void onSuccess(final RpcResult result) { changeStatus(result.getResult()); } @Override public void onFailure(final Throwable t) { LOG.error("Transaction {} failed to complete", getIdentifier(), t); changeStatus(TransactionStatus.FAILED); } }); return f; } @Override public ListenerRegistration registerListener(final DataTransactionListener listener) { return listeners.register(listener); } } private class CommitHandlerRegistrationImpl extends AbstractObjectRegistration, DataObject>> { private final InstanceIdentifier path; public CommitHandlerRegistrationImpl(final InstanceIdentifier path, final DataCommitHandler, DataObject> commitHandler) { super(commitHandler); this.path = path; } @Override protected void removeRegistration() { commitHandlers.remove(path, this); } } private static final class LegacyListenerRegistration implements ListenerRegistration { private final DataChangeListener instance; private final ListenerRegistration cfgReg; private final ListenerRegistration operReg; public LegacyListenerRegistration(final DataChangeListener listener, final ListenerRegistration cfgReg, final ListenerRegistration operReg) { this.instance = listener; this.cfgReg = cfgReg; this.operReg = operReg; } @Override public DataChangeListener getInstance() { return instance; } @Override public void close() { cfgReg.close(); operReg.close(); } } private static class BackwardsCompatibleOperationalDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator { private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener delegate; public BackwardsCompatibleOperationalDataChangeInvoker(final DataChangeListener listener) { this.delegate = listener; } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { DataChangeEvent legacyChange = LegacyDataChangeEvent.createOperational(change); delegate.onDataChanged(legacyChange); } @Override public DataChangeListener getDelegate() { return (DataChangeListener) delegate; } } private static class BackwardsCompatibleConfigurationDataChangeInvoker implements org.opendaylight.controller.md.sal.binding.api.DataChangeListener, Delegator { private final org.opendaylight.controller.md.sal.common.api.data.DataChangeListener delegate; public BackwardsCompatibleConfigurationDataChangeInvoker(final DataChangeListener listener) { this.delegate = listener; } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { DataChangeEvent legacyChange = LegacyDataChangeEvent.createConfiguration(change); delegate.onDataChanged(legacyChange); } @Override public DataChangeListener getDelegate() { return (DataChangeListener) delegate; } } }