*/
package org.opendaylight.controller.md.sal.binding.compat;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
import org.opendaylight.yangtools.concepts.Delegator;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(HydrogenDataBrokerAdapter.class);
- private final ConcurrentMap<InstanceIdentifier<?>, CommitHandlerRegistrationImpl> commitHandlers =
- new ConcurrentHashMap<>();
- private final ListeningExecutorService executorService = SingletonHolder.getDefaultCommitExecutor();
-
private final DataBroker delegate;
public HydrogenDataBrokerAdapter(final DataBroker dataBroker) {
return tx.readOperationalData(path);
}
- @Override
- public Registration registerCommitHandler(
- final InstanceIdentifier<? extends DataObject> path,
- final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
- final CommitHandlerRegistrationImpl reg = new CommitHandlerRegistrationImpl(path, commitHandler);
- commitHandlers.put(path, reg);
- return reg;
- }
-
- @Override
- @Deprecated
- public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>> registerCommitHandlerListener(
- final RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlerListener) {
- throw new UnsupportedOperationException("Not supported contract.");
- }
-
@Override
public ListenerRegistration<DataChangeListener> registerDataChangeListener(
final InstanceIdentifier<? extends DataObject> path, final DataChangeListener listener) {
return new LegacyListenerRegistration(listener,cfgReg,operReg);
}
- @Override
- public Registration registerDataReader(
- final InstanceIdentifier<? extends DataObject> path,
- final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> reader) {
- throw new UnsupportedOperationException("Data reader contract is not supported.");
- }
-
- public ListenableFuture<RpcResult<TransactionStatus>> commit(final ForwardedBackwardsCompatibleTransacion tx) {
-
- final List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> subTrans = new ArrayList<>();
- LOG.debug("Tx: {} Submitted.",tx.getIdentifier());
- final ListenableFuture<Boolean> requestCommit = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- try {
- for (final CommitHandlerRegistrationImpl handler : commitHandlers.values()) {
-
- final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx = handler
- .getInstance().requestCommit(tx);
- subTrans.add(subTx);
- }
- } catch (final Exception e) {
- LOG.error("Tx: {} Rollback.",tx.getIdentifier(),e);
- for (final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans) {
- subTx.rollback();
- }
- return false;
- }
- LOG.debug("Tx: {} Can Commit True.",tx.getIdentifier());
- return true;
- }
-
- });
-
- final ListenableFuture<RpcResult<TransactionStatus>> dataStoreCommit = Futures.transform(requestCommit, new AsyncFunction<Boolean, RpcResult<TransactionStatus>>() {
-
- @Override
- public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
- if(requestCommitSuccess) {
- return AbstractDataTransaction.convertToLegacyCommitFuture(tx.delegate.submit());
- }
- return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
- }
- });
-
- return Futures.transform(dataStoreCommit, new Function<RpcResult<TransactionStatus>,RpcResult<TransactionStatus>>() {
- @Override
- public RpcResult<TransactionStatus> apply(final RpcResult<TransactionStatus> input) {
- if(input.isSuccessful()) {
- for(final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
- subTx.finish();
- }
- } else {
- LOG.error("Tx: {} Rollback - Datastore commit failed.",tx.getIdentifier());
- for(final DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> subTx : subTrans ) {
- subTx.rollback();
- }
- }
- return input;
- }
- });
- }
-
@Deprecated
private class ForwardedBackwardsCompatibleTransacion implements DataModificationTransaction {
changeStatus(TransactionStatus.SUBMITED);
- final ListenableFuture<RpcResult<TransactionStatus>> f = HydrogenDataBrokerAdapter.this.commit(this);
+ final ListenableFuture<RpcResult<TransactionStatus>> f = delegate.commit();
Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
@Override
}
- private class CommitHandlerRegistrationImpl extends
- AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
-
- private final InstanceIdentifier<? extends DataObject> path;
-
- public CommitHandlerRegistrationImpl(final InstanceIdentifier<? extends DataObject> path,
- final DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
- super(commitHandler);
- this.path = path;
- }
-
- @Override
- protected void removeRegistration() {
- commitHandlers.remove(path, this);
- }
-
- }
-
-
private static final class LegacyListenerRegistration implements ListenerRegistration<DataChangeListener> {
private final DataChangeListener instance;