import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Objects;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.common.api.MappingCheckedFuture;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import java.util.Optional;
import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
private final ExecutionContext executionContext;
private final Timeout askTimeout;
- ActorProxyTransactionFacade(ActorRef masterTxActor, RemoteDeviceId id, ExecutionContext executionContext,
- Timeout askTimeout) {
+ ActorProxyTransactionFacade(final ActorRef masterTxActor, final RemoteDeviceId id,
+ final ExecutionContext executionContext, final Timeout askTimeout) {
this.masterTxActor = Objects.requireNonNull(masterTxActor);
this.id = Objects.requireNonNull(id);
this.executionContext = Objects.requireNonNull(executionContext);
}
@Override
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
- YangInstanceIdentifier path) {
+ public void close() {
+ cancel();
+ }
+
+ @Override
+ public FluentFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
LOG.debug("{}: Read {} {} failed", id, store, path, failure);
- settableFuture.setException(processFailure(failure));
+
+ final Throwable processedFailure = processFailure(failure);
+ if (processedFailure instanceof ReadFailedException) {
+ settableFuture.setException(processedFailure);
+ } else {
+ settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
+ + " failed", processedFailure));
+ }
return;
}
LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
if (response instanceof EmptyReadResponse) {
- settableFuture.set(Optional.absent());
+ settableFuture.set(Optional.empty());
return;
}
}
}, executionContext);
- return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
+ return settableFuture;
}
@Override
- public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
- settableFuture.setException(processFailure(failure));
+
+ final Throwable processedFailure = processFailure(failure);
+ if (processedFailure instanceof ReadFailedException) {
+ settableFuture.setException(processedFailure);
+ } else {
+ settableFuture.setException(new ReadFailedException("Exists of store " + store + " path " + path
+ + " failed", processedFailure));
+ }
return;
}
}
}, executionContext);
- return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
+ return settableFuture;
}
@Override
- public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
}
@Override
- public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
}
@Override
- public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
}
@Override
- public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ public FluentFuture<? extends CommitInfo> commit() {
LOG.debug("{}: Commit via actor {}", id, masterTxActor);
final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
failure);
}
- private Throwable processFailure(Throwable failure) {
+ private Throwable processFailure(final Throwable failure) {
if (failure instanceof AskTimeoutException) {
return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure);
}