X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Fsal%2Ftx%2FWriteCandidateTx.java;h=ac24ac8f35acd592e2a855ba671a14b4bc25cda1;hb=4b5ea5b9dbcddd02b69f093ee22d9e7420b403d3;hp=6df2239f5140698f9072809366524d63ceeffecd;hpb=15047776c449b6043e85047466c2215ed31036f1;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java index 6df2239f51..ac24ac8f35 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java @@ -8,23 +8,21 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.ModifyAction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -33,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Tx implementation for netconf devices that support only candidate datastore and no writable running + * Tx implementation for netconf devices that support only candidate datastore and no writable running. * The sequence goes as: *
    *
  1. Lock candidate datastore on tx construction @@ -46,7 +44,8 @@ import org.slf4j.LoggerFactory; *
  2. *
  3. Edit-config in candidate N times * *
  4. *
  5. Commit and Unlock candidate datastore async
  6. @@ -56,22 +55,6 @@ public class WriteCandidateTx extends AbstractWriteTx { private static final Logger LOG = LoggerFactory.getLogger(WriteCandidateTx.class); - private static final Function> RPC_RESULT_TO_TX_STATUS = new Function>() { - @Override - public RpcResult apply(final DOMRpcResult input) { - if (isSuccess(input)) { - return RpcResultBuilder.success(TransactionStatus.COMMITED).build(); - } else { - final RpcResultBuilder failed = RpcResultBuilder.failed(); - for (final RpcError rpcError : input.getErrors()) { - failed.withError(rpcError.getErrorType(), rpcError.getTag(), rpcError.getMessage(), - rpcError.getApplicationTag(), rpcError.getInfo(), rpcError.getCause()); - } - return failed.build(); - } - } - }; - public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final boolean rollbackSupport) { super(rpc, id, rollbackSupport); } @@ -85,7 +68,7 @@ public class WriteCandidateTx extends AbstractWriteTx { private void lock() { final FutureCallback lockCandidateCallback = new FutureCallback() { @Override - public void onSuccess(DOMRpcResult result) { + public void onSuccess(@Nonnull final DOMRpcResult result) { if (isSuccess(result)) { if (LOG.isTraceEnabled()) { LOG.trace("Lock candidate successful"); @@ -96,8 +79,8 @@ public class WriteCandidateTx extends AbstractWriteTx { } @Override - public void onFailure(Throwable t) { - LOG.warn("Lock candidate operation failed. {}", t); + public void onFailure(final Throwable throwable) { + LOG.warn("Lock candidate operation failed. {}", throwable); discardChanges(); } }; @@ -112,47 +95,42 @@ public class WriteCandidateTx extends AbstractWriteTx { @Override public synchronized CheckedFuture submit() { - final ListenableFuture commitFutureAsVoid = Futures.transform(commit(), new Function, Void>() { - @Override - public Void apply(final RpcResult input) { - Preconditions.checkArgument(input.isSuccessful() && input.getErrors().isEmpty(), "Submit failed with errors: %s", input.getErrors()); - return null; - } - }); - - return Futures.makeChecked(commitFutureAsVoid, new Function() { - @Override - public TransactionCommitFailedException apply(final Exception input) { - return new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed", input); - } - }); + final ListenableFuture commitFutureAsVoid = Futures.transform(commitConfiguration(), input -> { + Preconditions.checkArgument(input.isSuccessful() && input.getErrors().isEmpty(), + "Submit failed with errors: %s", input.getErrors()); + return null; + }, MoreExecutors.directExecutor()); + + return Futures.makeChecked(commitFutureAsVoid, input -> new TransactionCommitFailedException( + "Submit of transaction " + getIdentifier() + " failed", input)); } /** - * This has to be non blocking since it is called from a callback on commit and its netty threadpool that is really sensitive to blocking calls + * This has to be non blocking since it is called from a callback on commit + * and its netty threadpool that is really sensitive to blocking calls. */ private void discardChanges() { netOps.discardChanges(new NetconfRpcFutureCallback("Discarding candidate", id)); } @Override - public synchronized ListenableFuture> performCommit() { + public synchronized ListenableFuture> performCommit() { resultsFutures.add(netOps.commit(new NetconfRpcFutureCallback("Commit", id))); - ListenableFuture> txResult = resultsToTxStatus(); + final ListenableFuture> txResult = resultsToTxStatus(); - Futures.addCallback(txResult, new FutureCallback>() { + Futures.addCallback(txResult, new FutureCallback>() { @Override - public void onSuccess(@Nullable RpcResult result) { + public void onSuccess(@Nullable final RpcResult result) { cleanupOnSuccess(); } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable throwable) { // TODO If lock is cause of this failure cleanup will issue warning log // cleanup is trying to do unlock, but this will fail cleanup(); } - }); + }, MoreExecutors.directExecutor()); return txResult; } @@ -168,7 +146,7 @@ public class WriteCandidateTx extends AbstractWriteTx { final Optional defaultOperation, final String operation) { - NetconfRpcFutureCallback editConfigCallback = new NetconfRpcFutureCallback("Edit candidate", id); + final NetconfRpcFutureCallback editConfigCallback = new NetconfRpcFutureCallback("Edit candidate", id); if (defaultOperation.isPresent()) { resultsFutures.add(netOps.editConfigCandidate( @@ -179,7 +157,8 @@ public class WriteCandidateTx extends AbstractWriteTx { } /** - * This has to be non blocking since it is called from a callback on commit and its netty threadpool that is really sensitive to blocking calls + * This has to be non blocking since it is called from a callback on commit + * and its netty threadpool that is really sensitive to blocking calls. */ private void unlock() { netOps.unlockCandidate(new NetconfRpcFutureCallback("Unlock candidate", id));