package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.ExecutionException;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.ModifyAction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
protected final RemoteDeviceId id;
protected final NetconfBaseOps netOps;
protected final boolean rollbackSupport;
+ protected final List<ListenableFuture<DOMRpcResult>> resultsFutures;
+ private final List<TxListener> listeners = new CopyOnWriteArrayList<>();
// Allow commit to be called only once
protected boolean finished = false;
this.netOps = netOps;
this.id = id;
this.rollbackSupport = rollbackSupport;
+ this.resultsFutures = Lists.newArrayList();
init();
}
- static boolean isSuccess(final DOMRpcResult result) {
+ protected static boolean isSuccess(final DOMRpcResult result) {
return result.getErrors().isEmpty();
}
return finished;
}
- protected void invokeBlocking(final String msg, final Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>> op) throws NetconfDocumentedException {
- try {
- final DOMRpcResult compositeNodeRpcResult = op.apply(netOps).get();
- if(isSuccess(compositeNodeRpcResult) == false) {
- throw new NetconfDocumentedException(id + ": " + msg + " failed: " + compositeNodeRpcResult.getErrors(), NetconfDocumentedException.ErrorType.application,
- NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorSeverity.warning);
- }
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (final ExecutionException e) {
- throw new NetconfDocumentedException(id + ": " + msg + " failed: " + e.getMessage(), e, NetconfDocumentedException.ErrorType.application,
- NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorSeverity.warning);
- }
- }
-
@Override
public synchronized boolean cancel() {
if(isFinished()) {
return false;
}
-
+ listeners.forEach(listener -> listener.onTransactionCancelled(this));
finished = true;
cleanup();
return true;
return;
}
- try {
- editConfig(
- netOps.createEditConfigStrcture(Optional.<NormalizedNode<?, ?>>fromNullable(data), Optional.of(ModifyAction.REPLACE), path), Optional.of(ModifyAction.NONE));
- } catch (final NetconfDocumentedException e) {
- handleEditException(path, data, e, "putting");
- }
+ final DataContainerChild<?, ?> editStructure = netOps.createEditConfigStrcture(Optional.<NormalizedNode<?, ?>>fromNullable(data), Optional.of(ModifyAction.REPLACE), path);
+ editConfig(path, Optional.fromNullable(data), editStructure, Optional.of(ModifyAction.NONE), "put");
}
- protected abstract void handleEditException(YangInstanceIdentifier path, NormalizedNode<?, ?> data, NetconfDocumentedException e, String editType);
- protected abstract void handleDeleteException(YangInstanceIdentifier path, NetconfDocumentedException e);
-
@Override
public synchronized void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
checkEditable(store);
return;
}
- try {
- editConfig(
- netOps.createEditConfigStrcture(Optional.<NormalizedNode<?, ?>>fromNullable(data), Optional.<ModifyAction>absent(), path), Optional.<ModifyAction>absent());
- } catch (final NetconfDocumentedException e) {
- handleEditException(path, data, e, "merge");
- }
+ final DataContainerChild<?, ?> editStructure = netOps.createEditConfigStrcture(Optional.<NormalizedNode<?, ?>>fromNullable(data), Optional.<ModifyAction>absent(), path);
+ editConfig(path, Optional.fromNullable(data), editStructure, Optional.<ModifyAction>absent(), "merge");
}
/**
@Override
public synchronized void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
checkEditable(store);
-
- try {
- editConfig(
- netOps.createEditConfigStrcture(Optional.<NormalizedNode<?, ?>>absent(), Optional.of(ModifyAction.DELETE), path), Optional.of(ModifyAction.NONE));
- } catch (final NetconfDocumentedException e) {
- handleDeleteException(path, e);
- }
+ final DataContainerChild<?, ?> editStructure = netOps.createEditConfigStrcture(Optional.<NormalizedNode<?, ?>>absent(), Optional.of(ModifyAction.DELETE), path);
+ editConfig(path, Optional.<NormalizedNode<?, ?>>absent(), editStructure, Optional.of(ModifyAction.NONE), "delete");
}
@Override
public final ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ listeners.forEach(listener -> listener.onTransactionSubmitted(this));
checkNotFinished();
finished = true;
+ final ListenableFuture<RpcResult<TransactionStatus>> result = performCommit();
+ Futures.addCallback(result, new FutureCallback<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onSuccess(@Nullable final RpcResult<TransactionStatus> result) {
+ if (result != null && result.isSuccessful()) {
+ listeners.forEach(txListener -> txListener.onTransactionSuccessful(AbstractWriteTx.this));
+ } else {
+ final TransactionCommitFailedException cause = new TransactionCommitFailedException("Transaction failed", result.getErrors().toArray(new RpcError[result.getErrors().size()]));
+ listeners.forEach(listener -> listener.onTransactionFailed(AbstractWriteTx.this, cause));
+ }
+ }
- return performCommit();
+ @Override
+ public void onFailure(final Throwable t) {
+ listeners.forEach(listener -> listener.onTransactionFailed(AbstractWriteTx.this, t));
+ }
+ });
+ return result;
}
protected abstract ListenableFuture<RpcResult<TransactionStatus>> performCommit();
Preconditions.checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can edit only configuration data, not %s", store);
}
- protected abstract void editConfig(DataContainerChild<?, ?> editStructure, Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException;
+ protected abstract void editConfig(final YangInstanceIdentifier path, final Optional<NormalizedNode<?, ?>> data, final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation, final String operation);
+
+ protected ListenableFuture<RpcResult<TransactionStatus>> resultsToTxStatus() {
+ final SettableFuture<RpcResult<TransactionStatus>> transformed = SettableFuture.create();
+
+ Futures.addCallback(Futures.allAsList(resultsFutures), new FutureCallback<List<DOMRpcResult>>() {
+ @Override
+ public void onSuccess(final List<DOMRpcResult> domRpcResults) {
+ domRpcResults.forEach(domRpcResult -> {
+ if(!domRpcResult.getErrors().isEmpty() && !transformed.isDone()) {
+ final NetconfDocumentedException exception =
+ new NetconfDocumentedException(id + ":RPC during tx failed",
+ DocumentedException.ErrorType.APPLICATION,
+ DocumentedException.ErrorTag.OPERATION_FAILED,
+ DocumentedException.ErrorSeverity.ERROR);
+ transformed.setException(exception);
+ }
+ });
+
+ if(!transformed.isDone()) {
+ transformed.set(RpcResultBuilder.success(TransactionStatus.COMMITED).build());
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ final NetconfDocumentedException exception =
+ new NetconfDocumentedException(
+ id + ":RPC during tx returned an exception",
+ new Exception(throwable),
+ DocumentedException.ErrorType.APPLICATION,
+ DocumentedException.ErrorTag.OPERATION_FAILED,
+ DocumentedException.ErrorSeverity.ERROR);
+ transformed.setException(exception);
+ }
+ });
+
+ return transformed;
+ }
+
+ AutoCloseable addListener(final TxListener listener) {
+ listeners.add(listener);
+ return () -> listeners.remove(listener);
+ }
}