import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
}
}
- public CheckedFuture<Void, TransactionCommitFailedException> submit(final Object identifier) {
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit(final Object identifier) {
if (!opened.compareAndSet(true, false)) {
throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
}
final Future<Object> submitScalaFuture =
Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
- LOG.trace("{}: Submit {} via NETCONF", id);
+ LOG.trace("{}: Commit {} via NETCONF", id);
- final SettableFuture<Void> settableFuture = SettableFuture.create();
+ final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
submitScalaFuture.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(final Throwable failure, final Object success) throws Throwable {
if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
+ settableFuture.setException(newTransactionCommitFailedException(
+ NetconfTopologyUtils.createMasterIsDownException(id), identifier));
return;
}
if (success instanceof Throwable) {
- settableFuture.setException((Throwable) success);
+ settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier));
} else {
if (success instanceof SubmitFailedReply) {
LOG.error("{}: Transaction was not submitted because already closed.", id);
- settableFuture.setException(((SubmitFailedReply) success).getThrowable());
+ settableFuture.setException(newTransactionCommitFailedException(
+ ((SubmitFailedReply) success).getThrowable(), identifier));
return;
}
- settableFuture.set(null);
+ settableFuture.set(CommitInfo.empty());
}
}
}, actorSystem.dispatcher());
- return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
- @Nullable
- @Override
- public TransactionCommitFailedException apply(@Nullable final Exception input) {
- final String message = "Submit of transaction " + identifier + " failed";
- return new TransactionCommitFailedException(message, input);
- }
- });
+ return FluentFuture.from(settableFuture);
}
- public ListenableFuture<RpcResult<TransactionStatus>> commit(final Object identifier) {
- LOG.trace("{}: Commit", id);
-
- final CheckedFuture<Void, TransactionCommitFailedException> submit = submit(identifier);
- return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
- @Nullable
- @Override
- public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
- return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
- }
- });
+ private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure,
+ final Object identifier) {
+ return new TransactionCommitFailedException(
+ String.format("Commit of transaction %s failed", identifier), failure);
}
public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {