import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@Override
public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- return delegateWrite.submit(getIdentifier());
+ return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+ new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+ @Override
+ protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+ return new TransactionCommitFailedException(message, cause);
+ }
+ });
+ }
+
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ return delegateWrite.commit(getIdentifier());
}
@Override
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
}
}
- public CheckedFuture<Void, TransactionCommitFailedException> submit(final Object identifier) {
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit(final Object identifier) {
if (!opened.compareAndSet(true, false)) {
throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
}
final Future<Object> submitScalaFuture =
Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
- LOG.trace("{}: Submit {} via NETCONF", id);
+ LOG.trace("{}: Commit {} via NETCONF", id);
- final SettableFuture<Void> settableFuture = SettableFuture.create();
+ final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
submitScalaFuture.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(final Throwable failure, final Object success) throws Throwable {
if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
+ settableFuture.setException(newTransactionCommitFailedException(
+ NetconfTopologyUtils.createMasterIsDownException(id), identifier));
return;
}
if (success instanceof Throwable) {
- settableFuture.setException((Throwable) success);
+ settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier));
} else {
if (success instanceof SubmitFailedReply) {
LOG.error("{}: Transaction was not submitted because already closed.", id);
- settableFuture.setException(((SubmitFailedReply) success).getThrowable());
+ settableFuture.setException(newTransactionCommitFailedException(
+ ((SubmitFailedReply) success).getThrowable(), identifier));
return;
}
- settableFuture.set(null);
+ settableFuture.set(CommitInfo.empty());
}
}
}, actorSystem.dispatcher());
- return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
- @Nullable
- @Override
- public TransactionCommitFailedException apply(@Nullable final Exception input) {
- final String message = "Submit of transaction " + identifier + " failed";
- return new TransactionCommitFailedException(message, input);
- }
- });
+ return FluentFuture.from(settableFuture);
+ }
+
+ private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure,
+ final Object identifier) {
+ return new TransactionCommitFailedException(
+ String.format("Commit of transaction %s failed", identifier), failure);
}
public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
import akka.actor.ActorSystem;
import akka.util.Timeout;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@Override
public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- return proxyWriteAdapter.submit(getIdentifier());
+ return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+ new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+ @Override
+ protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+ return new TransactionCommitFailedException(message, cause);
+ }
+ });
+ }
+
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ return proxyWriteAdapter.commit(getIdentifier());
}
@Override
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.netconf.api.DocumentedException;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
editStructure, Optional.of(ModifyAction.NONE), "delete");
}
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+ new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+ @Override
+ protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+ return new TransactionCommitFailedException(message, cause);
+ }
+ });
+ }
+
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ final SettableFuture<CommitInfo> resultFuture = SettableFuture.create();
+ Futures.addCallback(commitConfiguration(), new FutureCallback<RpcResult<Void>>() {
+ @Override
+ public void onSuccess(RpcResult<Void> result) {
+ if (!result.isSuccessful()) {
+ final Collection<RpcError> errors = result.getErrors();
+ resultFuture.setException(new TransactionCommitFailedException(
+ String.format("Commit of transaction %s failed", getIdentifier()),
+ errors.toArray(new RpcError[errors.size()])));
+ return;
+ }
+
+ resultFuture.set(CommitInfo.empty());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ resultFuture.setException(new TransactionCommitFailedException(
+ String.format("Commit of transaction %s failed", getIdentifier()), failure));
+ }
+ }, MoreExecutors.directExecutor());
+
+ return FluentFuture.from(resultFuture);
+ }
+
protected final ListenableFuture<RpcResult<Void>> commitConfiguration() {
listeners.forEach(listener -> listener.onTransactionSubmitted(this));
checkNotFinished();
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
return delegateWriteTx.submit();
}
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ return delegateWriteTx.commit();
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final LogicalDatastoreType store, final YangInstanceIdentifier path) {
package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback;
cleanupOnSuccess();
}
- @Override
- public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
- final ListenableFuture<Void> commitFutureAsVoid = Futures.transform(commitConfiguration(), input -> {
- Preconditions.checkArgument(input.isSuccessful() && input.getErrors().isEmpty(),
- "Submit failed with errors: %s", input.getErrors());
- return null;
- }, MoreExecutors.directExecutor());
-
- return Futures.makeChecked(commitFutureAsVoid, input -> new TransactionCommitFailedException(
- "Submit of transaction " + getIdentifier() + " failed", input));
- }
-
/**
* This has to be non blocking since it is called from a callback on commit
* and its netty threadpool that is really sensitive to blocking calls.
package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback;
unlock();
}
- @Override
- public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
- final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commitConfiguration(),
- (Function<RpcResult<Void>, Void>) input -> null, MoreExecutors.directExecutor());
-
- return Futures.makeChecked(commmitFutureAsVoid,
- input -> new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed",
- input));
- }
-
@Override
public synchronized ListenableFuture<RpcResult<Void>> performCommit() {
for (final Change change : changes) {