import akka.actor.ReceiveTimeout;
import akka.actor.Status;
import akka.actor.UntypedAbstractActor;
-import com.google.common.util.concurrent.FluentFuture;
+import akka.util.JavaDurationConverters;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.ArrayList;
-import java.util.List;
+import java.time.Duration;
import java.util.Optional;
-import org.opendaylight.mdsal.common.api.CommitInfo;
+import java.util.function.Supplier;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class);
private final NetconfDataTreeService netconfService;
private final long idleTimeout;
- private List<ListenableFuture<? extends DOMRpcResult>> resultsFutures = new ArrayList<>();
-
private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
this.netconfService = netconfService;
this.idleTimeout = idleTimeout.toSeconds();
if (this.idleTimeout > 0) {
- context().setReceiveTimeout(idleTimeout);
+ context().setReceiveTimeout(JavaDurationConverters.asFiniteDuration(idleTimeout));
}
}
@Override
public void onReceive(final Object message) {
- if (message instanceof GetWithFieldsRequest) {
- final GetWithFieldsRequest getRequest = (GetWithFieldsRequest) message;
+ if (message instanceof GetWithFieldsRequest getRequest) {
final YangInstanceIdentifier path = getRequest.getPath();
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.get(
+ final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(
getRequest.getPath(), getRequest.getFields());
context().stop(self());
sendResult(future, path, sender(), self());
- } else if (message instanceof GetRequest) {
- final GetRequest getRequest = (GetRequest) message;
+ } else if (message instanceof GetRequest getRequest) {
final YangInstanceIdentifier path = getRequest.getPath();
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.get(path);
+ final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(path);
context().stop(self());
sendResult(future, path, sender(), self());
- } else if (message instanceof GetConfigWithFieldsRequest) {
- final GetConfigWithFieldsRequest getConfigRequest = (GetConfigWithFieldsRequest) message;
+ } else if (message instanceof GetConfigWithFieldsRequest getConfigRequest) {
final YangInstanceIdentifier path = getConfigRequest.getPath();
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.getConfig(
+ final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(
path, getConfigRequest.getFields());
context().stop(self());
sendResult(future, path, sender(), self());
- } else if (message instanceof GetConfigRequest) {
- final GetConfigRequest getConfigRequest = (GetConfigRequest) message;
+ } else if (message instanceof GetConfigRequest getConfigRequest) {
final YangInstanceIdentifier path = getConfigRequest.getPath();
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.getConfig(path);
+ final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(path);
context().stop(self());
sendResult(future, path, sender(), self());
} else if (message instanceof LockRequest) {
- resultsFutures.addAll(netconfService.lock());
- } else if (message instanceof MergeEditConfigRequest) {
- final MergeEditConfigRequest request = (MergeEditConfigRequest) message;
- resultsFutures.add(netconfService.merge(
+ invokeRpcCall(netconfService::lock, sender(), self());
+ } else if (message instanceof MergeEditConfigRequest request) {
+ netconfService.merge(
request.getStore(),
request.getNormalizedNodeMessage().getIdentifier(),
request.getNormalizedNodeMessage().getNode(),
- Optional.ofNullable(request.getDefaultOperation())));
- } else if (message instanceof ReplaceEditConfigRequest) {
- final ReplaceEditConfigRequest request = (ReplaceEditConfigRequest) message;
- resultsFutures.add(netconfService.replace(
+ Optional.ofNullable(request.getDefaultOperation()));
+ } else if (message instanceof ReplaceEditConfigRequest request) {
+ netconfService.replace(
request.getStore(),
request.getNormalizedNodeMessage().getIdentifier(),
request.getNormalizedNodeMessage().getNode(),
- Optional.ofNullable(request.getDefaultOperation())));
- } else if (message instanceof CreateEditConfigRequest) {
- final CreateEditConfigRequest request = (CreateEditConfigRequest) message;
- resultsFutures.add(netconfService.create(
+ Optional.ofNullable(request.getDefaultOperation()));
+ } else if (message instanceof CreateEditConfigRequest request) {
+ netconfService.create(
request.getStore(),
request.getNormalizedNodeMessage().getIdentifier(),
request.getNormalizedNodeMessage().getNode(),
- Optional.ofNullable(request.getDefaultOperation())));
- } else if (message instanceof DeleteEditConfigRequest) {
- final DeleteEditConfigRequest request = (DeleteEditConfigRequest) message;
- resultsFutures.add(netconfService.delete(request.getStore(), request.getPath()));
- } else if (message instanceof RemoveEditConfigRequest) {
- final RemoveEditConfigRequest request = (RemoveEditConfigRequest) message;
- resultsFutures.add(netconfService.remove(request.getStore(), request.getPath()));
+ Optional.ofNullable(request.getDefaultOperation()));
+ } else if (message instanceof DeleteEditConfigRequest request) {
+ netconfService.delete(request.getStore(), request.getPath());
+ } else if (message instanceof RemoveEditConfigRequest request) {
+ netconfService.remove(request.getStore(), request.getPath());
} else if (message instanceof CommitRequest) {
- context().stop(self());
submit(sender(), self());
} else if (message instanceof DiscardChangesRequest) {
- netconfService.discardChanges();
+ invokeRpcCall(netconfService::discardChanges, sender(), self());
} else if (message instanceof UnlockRequest) {
context().stop(self());
- netconfService.unlock();
+ invokeRpcCall(netconfService::unlock, sender(), self());
} else if (message instanceof ReceiveTimeout) {
LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
idleTimeout);
- netconfService.discardChanges();
- netconfService.unlock();
+ invokeRpcCall(netconfService::discardChanges, sender(), self());
+ invokeRpcCall(netconfService::unlock, sender(), self());
context().stop(self());
} else {
unhandled(message);
}
private void submit(final ActorRef requester, final ActorRef self) {
- final ListenableFuture<? extends CommitInfo> submitFuture = netconfService.commit(resultsFutures);
- FluentFuture.from(submitFuture).addCallback(new FutureCallback<CommitInfo>() {
+ Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(final DOMRpcResult result) {
+ if (result == null) {
+ requester.tell(new EmptyResultResponse(), getSender());
+ return;
+ }
+ NormalizedNodeMessage nodeMessageResp = null;
+ if (result.value() != null) {
+ nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.value());
+ }
+ requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.errors()), self);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ requester.tell(new Status.Failure(throwable), self);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private void invokeRpcCall(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation,
+ final ActorRef requester, final ActorRef self) {
+ Futures.addCallback(operation.get(), new FutureCallback<DOMRpcResult>() {
@Override
- public void onSuccess(final CommitInfo result) {
- requester.tell(new Status.Success(null), self);
+ public void onSuccess(final DOMRpcResult rpcResult) {
+ if (rpcResult == null) {
+ requester.tell(new EmptyResultResponse(), getSender());
+ return;
+ }
+ NormalizedNodeMessage nodeMessageResp = null;
+ if (rpcResult.value() != null) {
+ nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.value());
+ }
+ requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.errors()), self);
}
@Override
}, MoreExecutors.directExecutor());
}
- private void sendResult(final ListenableFuture<Optional<NormalizedNode<?, ?>>> feature,
- final YangInstanceIdentifier path,
- final ActorRef sender, final ActorRef self) {
- FluentFuture.from(feature).addCallback(new FutureCallback<>() {
+ private static void sendResult(final ListenableFuture<Optional<NormalizedNode>> feature,
+ final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) {
+ Futures.addCallback(feature, new FutureCallback<>() {
@Override
- public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
- if (!result.isPresent()) {
+ public void onSuccess(final Optional<NormalizedNode> result) {
+ if (result.isEmpty()) {
sender.tell(new EmptyReadResponse(), self);
return;
}