import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
/**
* ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
*/
public class ProxyWriteTransaction implements DOMDataWriteTransaction {
- private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class);
-
- private final ActorRef masterTxActor;
- private final RemoteDeviceId id;
- private final ActorSystem actorSystem;
- private final AtomicBoolean opened = new AtomicBoolean(true);
- private final Timeout askTimeout;
+ private final ProxyWriteAdapter proxyWriteAdapter;
/**
+ * Constructor for {@code ProxyWriteTransaction}.
+ *
* @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
* @param id device id
* @param actorSystem system
- * @param askTimeout
+ * @param askTimeout timeout
*/
public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
final Timeout askTimeout) {
- this.masterTxActor = masterTxActor;
- this.id = id;
- this.actorSystem = actorSystem;
- this.askTimeout = askTimeout;
+ proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
@Override
public boolean cancel() {
- if (!opened.compareAndSet(true, false)) {
- return false;
- }
- final Future<Object> cancelScalaFuture =
- Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
-
- LOG.trace("{}: Cancel {} via NETCONF", id);
-
- try {
- // here must be Await because AsyncWriteTransaction do not return future
- return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
- } catch (final Exception e) {
- return false;
- }
+ return proxyWriteAdapter.cancel();
}
@Override
public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- if (!opened.compareAndSet(true, false)) {
- throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed");
- }
- final Future<Object> submitScalaFuture =
- Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
-
- LOG.trace("{}: Submit {} via NETCONF", id);
-
- final SettableFuture<Void> settableFuture = SettableFuture.create();
- submitScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
- return;
- }
- if (success instanceof Throwable) {
- settableFuture.setException((Throwable) success);
- } else {
- if (success instanceof SubmitFailedReply) {
- LOG.error("{}: Transaction was not submitted because already closed.", id);
- }
- settableFuture.set(null);
+ return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+ new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+ @Override
+ protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+ return new TransactionCommitFailedException(message, cause);
}
- }
- }, actorSystem.dispatcher());
-
- return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
- @Nullable
- @Override
- public TransactionCommitFailedException apply(@Nullable final Exception input) {
- final String message = "Submit of transaction " + getIdentifier() + " failed";
- return new TransactionCommitFailedException(message, input);
- }
- });
+ });
}
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> commit() {
- LOG.trace("{}: Commit", id);
-
- final CheckedFuture<Void, TransactionCommitFailedException> submit = submit();
- return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
- @Nullable
- @Override
- public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
- return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
- }
- });
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ return proxyWriteAdapter.commit(getIdentifier());
}
@Override
public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
- LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
- masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+ proxyWriteAdapter.delete(store, identifier);
}
@Override
public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
final NormalizedNode<?, ?> data) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
- final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
- LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data);
- masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+ proxyWriteAdapter.put(store, identifier, data, getIdentifier());
}
@Override
public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
final NormalizedNode<?, ?> data) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
- final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
- LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data);
- masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+ proxyWriteAdapter.merge(store, identifier, data, getIdentifier());
}
@Override