import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.netconf.api.ModifyAction;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
@Override
- public List<ListenableFuture<? extends DOMRpcResult>> lock() {
+ public ListenableFuture<DOMRpcResult> lock() {
LOG.debug("{}: Lock via actor {}", id, masterActor);
- masterActor.tell(new LockRequest(), ActorRef.noSender());
- return new ArrayList<>();
+ final SettableFuture<DOMRpcResult> lockResult = SettableFuture.create();
+ final Future<Object> future = Patterns.ask(masterActor, new LockRequest(), askTimeout);
+ future.onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ lockResult.setException(failure);
+ } else if (response instanceof InvokeRpcMessageReply) {
+ lockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
+ } else {
+ lockResult.setException(new ClusteringRpcException("Lock operation returned unexpected type"));
+ LOG.error("{}: Lock via actor {} returned unexpected type", id, masterActor);
+ }
+ }
+ }, executionContext);
+ return lockResult;
}
@Override
- public void unlock() {
+ public ListenableFuture<DOMRpcResult> unlock() {
LOG.debug("{}: Unlock via actor {}", id, masterActor);
- masterActor.tell(new UnlockRequest(), ActorRef.noSender());
+ final SettableFuture<DOMRpcResult> unlockResult = SettableFuture.create();
+ final Future<Object> future = Patterns.ask(masterActor, new UnlockRequest(), askTimeout);
+ future.onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ unlockResult.setException(failure);
+ } else if (response instanceof InvokeRpcMessageReply) {
+ unlockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
+ } else {
+ unlockResult.setException(new ClusteringRpcException("Unlock operation returned unexpected type"));
+ LOG.error("{}: Unlock via actor {} returned unexpected type", id, masterActor);
+ }
+ }
+ }, executionContext);
+ return unlockResult;
}
@Override
- public void discardChanges() {
+ public ListenableFuture<DOMRpcResult> discardChanges() {
LOG.debug("{}: Discard changes via actor {}", id, masterActor);
- masterActor.tell(new DiscardChangesRequest(), ActorRef.noSender());
+ final SettableFuture<DOMRpcResult> discardChangesResult = SettableFuture.create();
+ final Future<Object> future = Patterns.ask(masterActor, new DiscardChangesRequest(), askTimeout);
+ future.onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ discardChangesResult.setException(failure);
+ } else if (response instanceof InvokeRpcMessageReply) {
+ discardChangesResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
+ } else {
+ discardChangesResult.setException(
+ new ClusteringRpcException("Discard changes operation returned unexpected type"));
+ LOG.error("{}: Discard changes via actor {} returned unexpected type", id, masterActor);
+ }
+ }
+ }, executionContext);
+ return discardChangesResult;
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> get(YangInstanceIdentifier path) {
+ public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
LOG.debug("{}: Get {} {} via actor {}", id, OPERATIONAL, path, masterActor);
final Future<Object> future = Patterns.ask(masterActor, new GetRequest(path), askTimeout);
return read(future, OPERATIONAL, path);
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> get(final YangInstanceIdentifier path,
- final List<YangInstanceIdentifier> fields) {
+ public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
+ final List<YangInstanceIdentifier> fields) {
LOG.debug("{}: Get {} {} with fields {} via actor {}", id, OPERATIONAL, path, fields, masterActor);
final Future<Object> future = Patterns.ask(masterActor, new GetWithFieldsRequest(path, fields), askTimeout);
return read(future, OPERATIONAL, path);
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> getConfig(YangInstanceIdentifier path) {
+ public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
LOG.debug("{}: GetConfig {} {} via actor {}", id, CONFIGURATION, path, masterActor);
final Future<Object> future = Patterns.ask(masterActor, new GetConfigRequest(path), askTimeout);
return read(future, CONFIGURATION, path);
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> getConfig(final YangInstanceIdentifier path,
- final List<YangInstanceIdentifier> fields) {
+ public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
+ final List<YangInstanceIdentifier> fields) {
LOG.debug("{}: GetConfig {} {} with fields {} via actor {}", id, CONFIGURATION, path, fields, masterActor);
final Future<Object> future = Patterns.ask(masterActor,
new GetConfigWithFieldsRequest(path, fields), askTimeout);
}
@Override
- public ListenableFuture<? extends DOMRpcResult> merge(LogicalDatastoreType store, YangInstanceIdentifier path,
- NormalizedNode<?, ?> data,
- Optional<ModifyAction> defaultOperation) {
+ public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path, final NormalizedNode data,
+ final Optional<ModifyAction> defaultOperation) {
LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterActor);
masterActor.tell(new MergeEditConfigRequest(
store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
}
@Override
- public ListenableFuture<? extends DOMRpcResult> replace(LogicalDatastoreType store, YangInstanceIdentifier path,
- NormalizedNode<?, ?> data,
- Optional<ModifyAction> defaultOperation) {
+ public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path, final NormalizedNode data,
+ final Optional<ModifyAction> defaultOperation) {
LOG.debug("{}: Replace {} {} via actor {}", id, store, path, masterActor);
masterActor.tell(new ReplaceEditConfigRequest(
}
@Override
- public ListenableFuture<? extends DOMRpcResult> create(LogicalDatastoreType store, YangInstanceIdentifier path,
- NormalizedNode<?, ?> data,
- Optional<ModifyAction> defaultOperation) {
+ public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path, final NormalizedNode data,
+ final Optional<ModifyAction> defaultOperation) {
LOG.debug("{}: Create {} {} via actor {}", id, store, path, masterActor);
masterActor.tell(new CreateEditConfigRequest(
store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
}
@Override
- public ListenableFuture<? extends DOMRpcResult> delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterActor);
masterActor.tell(new DeleteEditConfigRequest(store, path), ActorRef.noSender());
return createResult();
}
@Override
- public ListenableFuture<? extends DOMRpcResult> remove(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
LOG.debug("{}: Remove {} {} via actor {}", id, store, path, masterActor);
masterActor.tell(new RemoveEditConfigRequest(store, path), ActorRef.noSender());
return createResult();
}
@Override
- public ListenableFuture<? extends CommitInfo> commit(
- List<ListenableFuture<? extends DOMRpcResult>> resultsFutures) {
+ public ListenableFuture<? extends DOMRpcResult> commit() {
LOG.debug("{}: Commit via actor {}", id, masterActor);
final Future<Object> future = Patterns.ask(masterActor, new CommitRequest(), askTimeout);
- final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
+ final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
future.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
LOG.debug("{}: Commit failed", id, failure);
settableFuture.setException(newNetconfServiceFailedException(processFailure(failure)));
- return;
+ } else if (response instanceof InvokeRpcMessageReply) {
+ LOG.debug("{}: Commit succeeded", id);
+ settableFuture.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
+ } else {
+ settableFuture.setException(
+ new ClusteringRpcException("Commit operation returned unexpected type"));
+ LOG.error("{}: Commit via actor {} returned unexpected type", id, masterActor);
}
-
- LOG.debug("{}: Commit succeeded", id);
- settableFuture.set(CommitInfo.empty());
}
private NetconfServiceFailedException newNetconfServiceFailedException(final Throwable failure) {
}
@Override
- public @NonNull Object getDeviceId() {
+ public Object getDeviceId() {
return id;
}
- private SettableFuture<Optional<NormalizedNode<?, ?>>> read(final Future<Object> future,
- final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+ private SettableFuture<Optional<NormalizedNode>> read(final Future<Object> future, final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ final SettableFuture<Optional<NormalizedNode>> settableFuture = SettableFuture.create();
future.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object response) {
return settableFuture;
}
- @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
private Throwable processFailure(final Throwable failure) {
return failure instanceof AskTimeoutException
? NetconfTopologyUtils.createMasterIsDownException(id, (Exception) failure) : failure;
}
- private ListenableFuture<? extends DOMRpcResult> createResult() {
- final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
- settableFuture.set(new DefaultDOMRpcResult());
- return settableFuture;
+ // FIXME: this is being used in contexts where we should be waiting for a reply
+ private static ListenableFuture<? extends DOMRpcResult> createResult() {
+ return Futures.immediateFuture(new DefaultDOMRpcResult());
+ }
+
+ private static DOMRpcResult mapInvokeRpcMessageReplyToDOMRpcResult(final InvokeRpcMessageReply reply) {
+ if (reply.getNormalizedNodeMessage() == null) {
+ return new DefaultDOMRpcResult(new ArrayList<>(reply.getRpcErrors()));
+ } else {
+ return new DefaultDOMRpcResult(reply.getNormalizedNodeMessage().getNode(), reply.getRpcErrors());
+ }
}
}