From: Tom Pantelis Date: Mon, 21 May 2018 14:28:52 +0000 (-0400) Subject: Convert ProxyDOMDataBroker tx creation to async X-Git-Tag: release/fluorine~60^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=e60546d08fbc89e4780406c4123ba25c3e7698ef;p=netconf.git Convert ProxyDOMDataBroker tx creation to async ProxyReadWriteTransaction was modified to take the ActorRef Future from the tx creation message and add a callback when complete. Transaction operations prior to completion are queued and replayed to a ProxyTransactionFacade instance once the Future is complete. For successful completion, an ActorProxyTransactionFacade is created that interfaces with the master ActorRef. On failed Future, a FailedProxyTransactionFacade is created that returns a failed Future for read, exists and commit operations. The ProxyReadAdapter and ProxyWriteAdapter were removed in lieu of ProxyTransactionFacade which combines the read/write operations for simplicity. Some simple actor response messages, eg NewWriteTransactionReply, were removed in lieu of returning the payload or exception via akka.actor.Status.Success or akka.actor.Status.Failure. This simplifies caller response handling as akka automatically unwraps Success and Failure in callbacks and synchronous await. UTs were added, modified and removed corresponding to the changes. Change-Id: I0bd2a931d91ded97ebba7ccc207d51bd6474a41c Signed-off-by: Tom Pantelis --- diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java index ac77b7b32a..3a70ed700c 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java @@ -130,7 +130,7 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler txActorFuture = Patterns.ask(masterNode, new NewReadTransactionRequest(), askTimeout); - final Object msg; - try { - msg = Await.result(txActorFuture, askTimeout.duration()); - } catch (Exception e) { - throw new IllegalStateException("Can't create ProxyReadTransaction", e); - } - - if (msg instanceof Exception) { - throw new IllegalStateException("Can't create ProxyReadTransaction", (Exception) msg); - } - - Verify.verify(msg instanceof NewReadTransactionReply); - final NewReadTransactionReply reply = (NewReadTransactionReply) msg; - return new ProxyReadTransaction(reply.getTxActor(), id, actorSystem, askTimeout); + return new ProxyReadTransaction(id, txActorFuture, executionContext, askTimeout); } @SuppressWarnings("checkstyle:IllegalCatch") @Override public DOMDataReadWriteTransaction newReadWriteTransaction() { final Future txActorFuture = Patterns.ask(masterNode, new NewReadWriteTransactionRequest(), askTimeout); - final Object msg; - try { - msg = Await.result(txActorFuture, askTimeout.duration()); - } catch (Exception e) { - throw new IllegalStateException("Can't create ProxyReadWriteTransaction", e); - } - - if (msg instanceof Exception) { - throw new IllegalStateException("Can't create ProxyReadWriteTransaction", (Exception) msg); - } - - Verify.verify(msg instanceof NewReadWriteTransactionReply); - final NewReadWriteTransactionReply reply = (NewReadWriteTransactionReply) msg; - return new ProxyReadWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout); + return new ProxyReadWriteTransaction(id, txActorFuture, executionContext, askTimeout); } @SuppressWarnings("checkstyle:IllegalCatch") @Override public DOMDataWriteTransaction newWriteOnlyTransaction() { final Future txActorFuture = Patterns.ask(masterNode, new NewWriteTransactionRequest(), askTimeout); - final Object msg; - try { - msg = Await.result(txActorFuture, askTimeout.duration()); - } catch (Exception e) { - throw new IllegalStateException("Can't create ProxyWriteTransaction", e); - } - - if (msg instanceof Exception) { - throw new IllegalStateException("Can't create ProxyWriteTransaction", (Exception) msg); - } - - Verify.verify(msg instanceof NewWriteTransactionReply); - final NewWriteTransactionReply reply = (NewWriteTransactionReply) msg; - return new ProxyWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout); + return new ProxyReadWriteTransaction(id, txActorFuture, executionContext, askTimeout); } @Override diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java index 795ba78c75..d2d8c5f5a4 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java @@ -50,7 +50,7 @@ public class SlaveSalFacade { final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService(); final ProxyDOMDataBroker netconfDeviceDataBroker = - new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime); + new ProxyDOMDataBroker(id, masterActorRef, actorSystem.dispatcher(), actorResponseWaitTime); salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService); diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 67d0b52a7c..e8415f62fe 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -56,11 +56,8 @@ import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSource import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -156,25 +153,23 @@ public class NetconfNodeActor extends AbstractUntypedActor { sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender()); } else if (message instanceof NewReadTransactionRequest) { // master - - sender().tell(new NewReadTransactionReply(readTxActor), self()); - + sender().tell(new Success(readTxActor), self()); } else if (message instanceof NewWriteTransactionRequest) { // master try { final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout)); - sender().tell(new NewWriteTransactionReply(txActor), self()); + sender().tell(new Success(txActor), self()); } catch (final Exception t) { - sender().tell(t, self()); + sender().tell(new Failure(t), self()); } } else if (message instanceof NewReadWriteTransactionRequest) { try { final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout)); - sender().tell(new NewReadWriteTransactionReply(txActor), self()); + sender().tell(new Success(txActor), self()); } catch (final Exception t) { - sender().tell(t, self()); + sender().tell(new Failure(t), self()); } } else if (message instanceof InvokeRpcMessage) { // master final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message; @@ -223,6 +218,7 @@ public class NetconfNodeActor extends AbstractUntypedActor { @Override public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) { try { + LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier); sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf()); } catch (IOException e) { sender.tell(new Failure(e), getSelf()); @@ -231,6 +227,7 @@ public class NetconfNodeActor extends AbstractUntypedActor { @Override public void onFailure(@Nonnull final Throwable throwable) { + LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable); sender.tell(new Failure(throwable), getSelf()); } }, MoreExecutors.directExecutor()); diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java index 6b79f62125..e3fb37f219 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java @@ -9,6 +9,7 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorRef; +import akka.actor.Status.Failure; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; @@ -66,7 +67,7 @@ class ReadAdapter { @Override public void onFailure(@Nonnull final Throwable throwable) { - sender.tell(throwable, self); + sender.tell(new Failure(throwable), self); } }, MoreExecutors.directExecutor()); } @@ -86,7 +87,7 @@ class ReadAdapter { @Override public void onFailure(@Nonnull final Throwable throwable) { - sender.tell(throwable, self); + sender.tell(new Failure(throwable), self); } }, MoreExecutors.directExecutor()); } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java index a214dd4fc7..250b893099 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java @@ -10,6 +10,8 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorContext; import akka.actor.ActorRef; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -22,8 +24,6 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelR import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,12 +50,12 @@ class WriteAdapter { Futures.addCallback(submitFuture, new FutureCallback() { @Override public void onSuccess(final Void result) { - requester.tell(new SubmitReply(), self); + requester.tell(new Success(null), self); } @Override public void onFailure(@Nonnull final Throwable throwable) { - requester.tell(new SubmitFailedReply(throwable), self); + requester.tell(new Failure(throwable), self); } }, MoreExecutors.directExecutor()); } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ActorProxyTransactionFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ActorProxyTransactionFacade.java new file mode 100644 index 0000000000..3ab1cd9752 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ActorProxyTransactionFacade.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Objects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.MappingCheckedFuture; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +/** + * ProxyTransactionFacade implementation that interfaces with an actor. + * + * @author Thomas Pantelis + */ +class ActorProxyTransactionFacade implements ProxyTransactionFacade { + private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class); + + private final ActorRef masterTxActor; + private final RemoteDeviceId id; + private final ExecutionContext executionContext; + private final Timeout askTimeout; + + ActorProxyTransactionFacade(ActorRef masterTxActor, RemoteDeviceId id, ExecutionContext executionContext, + Timeout askTimeout) { + this.masterTxActor = Objects.requireNonNull(masterTxActor); + this.id = Objects.requireNonNull(id); + this.executionContext = Objects.requireNonNull(executionContext); + this.askTimeout = Objects.requireNonNull(askTimeout); + } + + @Override + public Object getIdentifier() { + return id; + } + + @Override + public boolean cancel() { + LOG.debug("{}: Cancel via actor {}", id, masterTxActor); + + final Future future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout); + + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + if (failure != null) { + LOG.warn("{}: Cancel failed", id, failure); + return; + } + + LOG.debug("{}: Cancel succeeded", id); + } + }, executionContext); + + return true; + } + + @Override + public CheckedFuture>, ReadFailedException> read(LogicalDatastoreType store, + YangInstanceIdentifier path) { + LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor); + + final Future future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout); + + final SettableFuture>> settableFuture = SettableFuture.create(); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + if (failure != null) { + LOG.debug("{}: Read {} {} failed", id, store, path, failure); + settableFuture.setException(processFailure(failure)); + return; + } + + LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response); + + if (response instanceof EmptyReadResponse) { + settableFuture.set(Optional.absent()); + return; + } + + if (response instanceof NormalizedNodeMessage) { + final NormalizedNodeMessage data = (NormalizedNodeMessage) response; + settableFuture.set(Optional.of(data.getNode())); + } + } + }, executionContext); + + return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER); + } + + @Override + public CheckedFuture exists(LogicalDatastoreType store, YangInstanceIdentifier path) { + LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor); + + final Future future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout); + + final SettableFuture settableFuture = SettableFuture.create(); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + if (failure != null) { + LOG.debug("{}: Exists {} {} failed", id, store, path, failure); + settableFuture.setException(processFailure(failure)); + return; + } + + LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response); + + settableFuture.set((Boolean) response); + } + }, executionContext); + + return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER); + } + + @Override + public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) { + LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor); + masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender()); + } + + @Override + public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode data) { + LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor); + masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender()); + } + + @Override + public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode data) { + LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor); + masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender()); + } + + @Override + public @NonNull FluentFuture commit() { + LOG.debug("{}: Commit via actor {}", id, masterTxActor); + + final Future future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout); + + final SettableFuture settableFuture = SettableFuture.create(); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + if (failure != null) { + LOG.debug("{}: Commit failed", id, failure); + settableFuture.setException(newTransactionCommitFailedException(processFailure(failure))); + return; + } + + LOG.debug("{}: Commit succeeded", id); + + settableFuture.set(CommitInfo.empty()); + } + }, executionContext); + + return settableFuture; + } + + private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) { + return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed", getIdentifier()), + failure); + } + + private Throwable processFailure(Throwable failure) { + if (failure instanceof AskTimeoutException) { + return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure); + } + + return failure; + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/FailedProxyTransactionFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/FailedProxyTransactionFacade.java new file mode 100644 index 0000000000..97d51cb730 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/FailedProxyTransactionFacade.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.Futures; +import java.util.Objects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of ProxyTransactionFacade that fails each request. + * + * @author Thomas Pantelis + */ +class FailedProxyTransactionFacade implements ProxyTransactionFacade { + private static final Logger LOG = LoggerFactory.getLogger(FailedProxyTransactionFacade.class); + + private final RemoteDeviceId id; + private final Throwable failure; + + FailedProxyTransactionFacade(RemoteDeviceId id, Throwable failure) { + this.id = Objects.requireNonNull(id); + this.failure = Objects.requireNonNull(failure); + } + + @Override + public Object getIdentifier() { + return id; + } + + @Override + public boolean cancel() { + return true; + } + + @Override + public CheckedFuture>, ReadFailedException> read(LogicalDatastoreType store, + YangInstanceIdentifier path) { + LOG.debug("{}: Read {} {} - failure {}", id, store, path, failure); + return Futures.immediateFailedCheckedFuture(ReadFailedException.MAPPER.apply( + failure instanceof Exception ? (Exception)failure : new ReadFailedException("read", failure))); + } + + @Override + public CheckedFuture exists(LogicalDatastoreType store, YangInstanceIdentifier path) { + LOG.debug("{}: Exists {} {} - failure {}", id, store, path, failure); + return Futures.immediateFailedCheckedFuture(ReadFailedException.MAPPER.apply( + failure instanceof Exception ? (Exception)failure : new ReadFailedException("read", failure))); + } + + @Override + public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) { + LOG.debug("{}: Delete {} {} - failure {}", id, store, path, failure); + } + + @Override + public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode data) { + LOG.debug("{}: Put {} {} - failure {}", id, store, path, failure); + } + + @Override + public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode data) { + LOG.debug("{}: Merge {} {} - failure {}", id, store, path, failure); + } + + @Override + public @NonNull FluentFuture commit() { + LOG.debug("{}: Commit {} {} - failure {}", id, failure); + return FluentFuture.from(Futures.immediateFailedFuture(failure instanceof Exception + ? AsyncWriteTransaction.SUBMIT_EXCEPTION_MAPPER.apply((Exception)failure) + : new TransactionCommitFailedException("commit", failure))); + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java deleted file mode 100644 index 0f23c11f2e..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; -import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; - -class ProxyReadAdapter { - private static final Logger LOG = LoggerFactory.getLogger(ProxyReadAdapter.class); - - private final ActorRef masterTxActor; - private final RemoteDeviceId id; - private final ActorSystem actorSystem; - private final Timeout askTimeout; - - ProxyReadAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, - final Timeout askTimeout) { - this.masterTxActor = masterTxActor; - this.id = id; - this.actorSystem = actorSystem; - this.askTimeout = askTimeout; - } - - public void close() { - //noop - } - - public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { - LOG.trace("{}: Read {} via NETCONF: {}", id, store, path); - - final Future future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout); - final SettableFuture>> settableFuture = SettableFuture.create(); - future.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, - final Object success) throws Throwable { - if (failure != null) { // ask timeout - final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); - settableFuture.setException(exception); - return; - } - if (success instanceof Throwable) { // Error sended by master - settableFuture.setException((Throwable) success); - return; - } - if (success instanceof EmptyReadResponse) { - settableFuture.set(Optional.absent()); - return; - } - if (success instanceof NormalizedNodeMessage) { - final NormalizedNodeMessage data = (NormalizedNodeMessage) success; - settableFuture.set(Optional.of(data.getNode())); - } - } - }, actorSystem.dispatcher()); - return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER); - } - - public CheckedFuture exists(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { - final Future existsScalaFuture = - Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout); - - LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path); - - final SettableFuture settableFuture = SettableFuture.create(); - existsScalaFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object success) throws Throwable { - if (failure != null) { // ask timeout - final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); - settableFuture.setException(exception); - return; - } - if (success instanceof Throwable) { - settableFuture.setException((Throwable) success); - return; - } - settableFuture.set((Boolean) success); - } - }, actorSystem.dispatcher()); - return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER); - } - -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java index b73c9466e1..ed972c1d5d 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java @@ -9,58 +9,25 @@ package org.opendaylight.netconf.topology.singleton.impl.tx; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; /** * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor}. */ -public class ProxyReadTransaction implements DOMDataReadOnlyTransaction { +public class ProxyReadTransaction extends ProxyReadWriteTransaction implements DOMDataReadOnlyTransaction { - private final ProxyReadAdapter delegate; - - /** - * Constructor for {@code ProxyReadTransaction}. - * - * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref - * @param id device id - * @param actorSystem system - * @param askTimeout timeout - */ - public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, - final Timeout askTimeout) { - delegate = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout); + public ProxyReadTransaction(final RemoteDeviceId id, final Future masterTxActorFuture, + final ExecutionContext executionContext, final Timeout askTimeout) { + super(id, masterTxActorFuture, executionContext, askTimeout); } @Override public void close() { - delegate.close(); - } - - @Override - public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { - return delegate.read(store, path); - } - - @Override - public CheckedFuture exists(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { - return delegate.exists(store, path); - } - - - @Override - public Object getIdentifier() { - return this; + // noop } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java index 5cd2418312..82fbc41e05 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java @@ -9,100 +9,188 @@ package org.opendaylight.netconf.topology.singleton.impl.tx; import akka.actor.ActorRef; -import akka.actor.ActorSystem; +import akka.dispatch.OnComplete; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import javax.annotation.concurrent.GuardedBy; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.MappingCheckedFuture; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; /** * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}. */ public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction { + private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class); - private final ProxyReadAdapter delegateRead; - private final ProxyWriteAdapter delegateWrite; - - /** - * Constructor for {@code ProxyReadWriteTransaction}. - * - * @param masterTxActor - * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor} ref - * @param id device id - * @param actorSystem system - * @param askTimeout timeout - */ - public ProxyReadWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, - final ActorSystem actorSystem, final Timeout askTimeout) { - delegateRead = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout); - delegateWrite = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout); + private final RemoteDeviceId id; + private final AtomicBoolean opened = new AtomicBoolean(true); + + @GuardedBy("queuedTxOperations") + private final List> queuedTxOperations = new ArrayList<>(); + + private volatile ProxyTransactionFacade transactionFacade; + + public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future masterTxActorFuture, + final ExecutionContext executionContext, final Timeout askTimeout) { + this.id = id; + + masterTxActorFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object masterTxActor) { + final ProxyTransactionFacade newTransactionFacade; + if (failure != null) { + LOG.debug("{}: Failed to obtain master actor", id, failure); + newTransactionFacade = new FailedProxyTransactionFacade(id, failure); + } else { + LOG.debug("{}: Obtained master actor {}", id, masterTxActor); + newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id, + executionContext, askTimeout); + } + + executePriorTransactionOperations(newTransactionFacade); + } + }, executionContext); } @Override public boolean cancel() { - return delegateWrite.cancel(); + if (!opened.compareAndSet(true, false)) { + return false; + } + + processTransactionOperation(facade -> facade.cancel()); + return true; } @Override public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { - return delegateRead.read(store, path); - } + final YangInstanceIdentifier path) { + LOG.debug("{}: Read {} {}", id, store, path); - @Override - public CheckedFuture exists(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { - return delegateRead.exists(store, path); + final SettableFuture>> returnFuture = SettableFuture.create(); + processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path))); + return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } @Override - public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { - delegateWrite.delete(store, path); - } + public CheckedFuture exists(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + LOG.debug("{}: Exists {} {}", id, store, path); - @Override - public CheckedFuture submit() { - return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()), - new ExceptionMapper("commit", TransactionCommitFailedException.class) { - @Override - protected TransactionCommitFailedException newWithCause(String message, Throwable cause) { - return new TransactionCommitFailedException(message, cause); - } - }); + final SettableFuture returnFuture = SettableFuture.create(); + processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path))); + return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } @Override - public @NonNull FluentFuture commit() { - return delegateWrite.commit(getIdentifier()); + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + checkOpen(); + LOG.debug("{}: Delete {} {}", id, store, path); + processTransactionOperation(facade -> facade.delete(store, path)); } @Override public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { - delegateWrite.put(store, path, data, getIdentifier()); + checkOpen(); + LOG.debug("{}: Put {} {}", id, store, path); + processTransactionOperation(facade -> facade.put(store, path, data)); } @Override public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) { - delegateWrite.merge(store, path, data, getIdentifier()); + checkOpen(); + LOG.debug("{}: Merge {} {}", id, store, path); + processTransactionOperation(facade -> facade.merge(store, path, data)); + } + + @Override + public @NonNull FluentFuture commit() { + Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id); + LOG.debug("{}: Commit", id); + + final SettableFuture returnFuture = SettableFuture.create(); + processTransactionOperation(facade -> returnFuture.setFuture(facade.commit())); + return returnFuture; } @Override public Object getIdentifier() { - return this; + return id; + } + + private void processTransactionOperation(final Consumer operation) { + final ProxyTransactionFacade facadeOnEntry; + synchronized (queuedTxOperations) { + if (transactionFacade == null) { + LOG.debug("{}: Queuing transaction operation", id); + + queuedTxOperations.add(operation); + facadeOnEntry = null; + } else { + facadeOnEntry = transactionFacade; + } + } + + if (facadeOnEntry != null) { + operation.accept(facadeOnEntry); + } + } + + private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) { + while (true) { + // Access to queuedTxOperations and transactionFacade must be protected and atomic + // (ie synchronized) with respect to #processTransactionOperation to handle timing + // issues and ensure no ProxyTransactionFacade is missed and that they are processed + // in the order they occurred. + + // We'll make a local copy of the queuedTxOperations list to handle re-entrancy + // in case a transaction operation results in another transaction operation being + // queued (eg a put operation from a client read Future callback that is notified + // synchronously). + final Collection> operationsBatch; + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { + // We're done invoking the transaction operations so we can now publish the + // ProxyTransactionFacade. + transactionFacade = newTransactionFacade; + break; + } + + operationsBatch = new ArrayList<>(queuedTxOperations); + queuedTxOperations.clear(); + } + + // Invoke transaction operations outside the sync block to avoid unnecessary blocking. + for (Consumer oper : operationsBatch) { + oper.accept(newTransactionFacade); + } + } + } + + private void checkOpen() { + Preconditions.checkState(opened.get(), "%s: Transaction is closed", id); } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyTransactionFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyTransactionFacade.java new file mode 100644 index 0000000000..babac0dda9 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyTransactionFacade.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; + +/** + * Interfaces with a transaction back-end. + * + * @author Thomas Pantelis + */ +interface ProxyTransactionFacade extends DOMDataReadWriteTransaction { +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java deleted file mode 100644 index ccd58fb9e2..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; -import akka.util.Timeout; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.mdsal.common.api.CommitInfo; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; - -public class ProxyWriteAdapter { - - private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteAdapter.class); - - private final ActorRef masterTxActor; - private final RemoteDeviceId id; - private final ActorSystem actorSystem; - private final AtomicBoolean opened = new AtomicBoolean(true); - private final Timeout askTimeout; - - public ProxyWriteAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, - final Timeout askTimeout) { - this.masterTxActor = masterTxActor; - this.id = id; - this.actorSystem = actorSystem; - this.askTimeout = askTimeout; - } - - @SuppressWarnings("checkstyle:IllegalCatch") - public boolean cancel() { - if (!opened.compareAndSet(true, false)) { - return false; - } - final Future cancelScalaFuture = - Patterns.ask(masterTxActor, new CancelRequest(), askTimeout); - - LOG.trace("{}: Cancel {} via NETCONF", id); - - try { - // here must be Await because AsyncWriteTransaction do not return future - return (boolean) Await.result(cancelScalaFuture, askTimeout.duration()); - } catch (final Exception e) { - return false; - } - } - - public @NonNull FluentFuture commit(final Object identifier) { - if (!opened.compareAndSet(true, false)) { - throw new IllegalStateException(id + ": Transaction" + identifier + " is closed"); - } - final Future submitScalaFuture = - Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout); - - LOG.trace("{}: Commit {} via NETCONF", id); - - final SettableFuture settableFuture = SettableFuture.create(); - submitScalaFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object success) throws Throwable { - if (failure != null) { // ask timeout - settableFuture.setException(newTransactionCommitFailedException( - NetconfTopologyUtils.createMasterIsDownException(id), identifier)); - return; - } - if (success instanceof Throwable) { - settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier)); - } else { - if (success instanceof SubmitFailedReply) { - LOG.error("{}: Transaction was not submitted because already closed.", id); - settableFuture.setException(newTransactionCommitFailedException( - ((SubmitFailedReply) success).getThrowable(), identifier)); - return; - } - - settableFuture.set(CommitInfo.empty()); - } - } - }, actorSystem.dispatcher()); - - return FluentFuture.from(settableFuture); - } - - private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure, - final Object identifier) { - return new TransactionCommitFailedException( - String.format("Commit of transaction %s failed", identifier), failure); - } - - public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) { - Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier); - LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier); - masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender()); - } - - public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, - final NormalizedNode data, final Object identifier) { - Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier); - final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data); - LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, path, data); - masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender()); - } - - public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, - final NormalizedNode data, final Object identifier) { - Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier); - final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data); - LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data); - masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender()); - } - -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java deleted file mode 100644 index d2840235e9..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.util.Timeout; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.MoreExecutors; -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.mdsal.common.api.CommitInfo; -import org.opendaylight.mdsal.common.api.MappingCheckedFuture; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -/** - * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master - * {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor}. - */ -public class ProxyWriteTransaction implements DOMDataWriteTransaction { - - private final ProxyWriteAdapter proxyWriteAdapter; - - /** - * Constructor for {@code ProxyWriteTransaction}. - * - * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref - * @param id device id - * @param actorSystem system - * @param askTimeout timeout - */ - public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, - final Timeout askTimeout) { - proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout); - } - - @SuppressWarnings("checkstyle:IllegalCatch") - @Override - public boolean cancel() { - return proxyWriteAdapter.cancel(); - } - - @Override - public CheckedFuture submit() { - return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()), - new ExceptionMapper("commit", TransactionCommitFailedException.class) { - @Override - protected TransactionCommitFailedException newWithCause(String message, Throwable cause) { - return new TransactionCommitFailedException(message, cause); - } - }); - } - - @Override - public @NonNull FluentFuture commit() { - return proxyWriteAdapter.commit(getIdentifier()); - } - - @Override - public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) { - proxyWriteAdapter.delete(store, identifier); - } - - @Override - public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier, - final NormalizedNode data) { - proxyWriteAdapter.put(store, identifier, data, getIdentifier()); - } - - @Override - public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier, - final NormalizedNode data) { - proxyWriteAdapter.merge(store, identifier, data, getIdentifier()); - } - - @Override - public Object getIdentifier() { - return this; - } -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java index 474619ee24..eb34152e93 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java @@ -232,8 +232,8 @@ public final class NetconfTopologyUtils { return createTopologyListPath(topologyId).child(Node.class); } - public static DocumentedException createMasterIsDownException(final RemoteDeviceId id) { - return new DocumentedException(id + ":Master is down. Please try again.", + public static DocumentedException createMasterIsDownException(final RemoteDeviceId id, final Exception cause) { + return new DocumentedException(id + ":Master is down. Please try again.", cause, DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED, DocumentedException.ErrorSeverity.WARNING); } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java deleted file mode 100644 index f2fcd0c55b..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.messages.transactions; - -import akka.actor.ActorRef; -import java.io.Serializable; - -public class NewReadTransactionReply implements Serializable { - private static final long serialVersionUID = 1L; - - private final ActorRef txActor; - - public NewReadTransactionReply(final ActorRef txActor) { - this.txActor = txActor; - } - - public ActorRef getTxActor() { - return txActor; - } -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java deleted file mode 100644 index 08e293059e..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.messages.transactions; - -import akka.actor.ActorRef; -import java.io.Serializable; - -public class NewReadWriteTransactionReply implements Serializable { - private static final long serialVersionUID = 1L; - - private final ActorRef txActor; - - public NewReadWriteTransactionReply(final ActorRef txActor) { - this.txActor = txActor; - } - - public ActorRef getTxActor() { - return txActor; - } -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java deleted file mode 100644 index 8d5e8181fc..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.messages.transactions; - -import akka.actor.ActorRef; -import java.io.Serializable; - -public class NewWriteTransactionReply implements Serializable { - private static final long serialVersionUID = 1L; - - private final ActorRef txActor; - - public NewWriteTransactionReply(final ActorRef txActor) { - this.txActor = txActor; - } - - public ActorRef getTxActor() { - return txActor; - } -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitFailedReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitFailedReply.java deleted file mode 100644 index 7dfc19c0c6..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitFailedReply.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.messages.transactions; - -import java.io.Serializable; - -/** - * Message sent from master back to the slave when submit fails, with the offending exception attached. - */ -public class SubmitFailedReply implements Serializable { - private static final long serialVersionUID = 1L; - - private final Throwable throwable; - - public SubmitFailedReply(final Throwable throwable) { - this.throwable = throwable; - } - - public Throwable getThrowable() { - return throwable; - } -} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java deleted file mode 100644 index 5ca306c6cd..0000000000 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.messages.transactions; - -import java.io.Serializable; - -/** - * Message sent from master back to the slave when submit is successfully performed. - */ -public class SubmitReply implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBrokerTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBrokerTest.java new file mode 100644 index 0000000000..b1bc972576 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBrokerTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import akka.actor.ActorSystem; +import akka.actor.Status.Success; +import akka.testkit.TestProbe; +import akka.testkit.javadsl.TestKit; +import akka.util.Timeout; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Unit tests for ProxyDOMDataBroker. + * + * @author Thomas Pantelis + */ +public class ProxyDOMDataBrokerTest { + private static final RemoteDeviceId DEVICE_ID = + new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830)); + + private static ActorSystem system = ActorSystem.apply(); + + private final TestProbe masterActor = new TestProbe(system); + private final ProxyDOMDataBroker proxy = new ProxyDOMDataBroker(DEVICE_ID, masterActor.ref(), system.dispatcher(), + Timeout.apply(5, TimeUnit.SECONDS)); + + @AfterClass + public static void staticTearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); + } + + @Test + public void testNewReadOnlyTransaction() { + final DOMDataReadOnlyTransaction tx = proxy.newReadOnlyTransaction(); + masterActor.expectMsgClass(NewReadTransactionRequest.class); + masterActor.reply(new Success(masterActor.ref())); + + assertEquals(DEVICE_ID, tx.getIdentifier()); + + tx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); + masterActor.expectMsgClass(ReadRequest.class); + } + + @Test + public void testNewWriteOnlyTransaction() { + final DOMDataWriteTransaction tx = proxy.newWriteOnlyTransaction(); + masterActor.expectMsgClass(NewWriteTransactionRequest.class); + masterActor.reply(new Success(masterActor.ref())); + + assertEquals(DEVICE_ID, tx.getIdentifier()); + + tx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); + masterActor.expectMsgClass(DeleteRequest.class); + } + + @Test + public void testNewReadWriteTransaction() { + final DOMDataReadWriteTransaction tx = proxy.newReadWriteTransaction(); + masterActor.expectMsgClass(NewReadWriteTransactionRequest.class); + masterActor.reply(new Success(masterActor.ref())); + + assertEquals(DEVICE_ID, tx.getIdentifier()); + + tx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); + masterActor.expectMsgClass(ReadRequest.class); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCreateTransactionChain() { + proxy.createTransactionChain(null); + } + + @Test + public void testGetSupportedExtensions() { + assertTrue(proxy.getSupportedExtensions().isEmpty()); + } +} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java index d41e6955f8..d39a8ed8e3 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java @@ -8,98 +8,29 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import akka.testkit.TestProbe; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; -import org.junit.After; +import akka.testkit.javadsl.TestKit; +import org.junit.AfterClass; import org.junit.Before; -import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; - -public class ReadTransactionActorTest { - private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; - private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; +public class ReadTransactionActorTest extends ReadTransactionActorTestAdapter { + private static ActorSystem system = ActorSystem.apply(); @Mock - private DOMDataReadOnlyTransaction deviceReadTx; - private TestProbe probe; - private ActorSystem system; - private TestActorRef actorRef; + private DOMDataReadOnlyTransaction mockReadTx; @Before - public void setUp() throws Exception { + public void setUp() { MockitoAnnotations.initMocks(this); - system = ActorSystem.apply(); - probe = TestProbe.apply(system); - actorRef = TestActorRef.create(system, ReadTransactionActor.props(deviceReadTx), "testA"); - } - - @After - public void tearDown() throws Exception { - JavaTestKit.shutdownActorSystem(system, null, true); - } - - @Test - public void testRead() throws Exception { - final ContainerNode node = Builders.containerBuilder() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))) - .build(); - when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node))); - actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); - verify(deviceReadTx).read(STORE, PATH); - probe.expectMsgClass(NormalizedNodeMessage.class); - } - - @Test - public void testReadEmpty() throws Exception { - when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent())); - actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); - verify(deviceReadTx).read(STORE, PATH); - probe.expectMsgClass(EmptyReadResponse.class); - } - - @Test - public void testReadFailure() throws Exception { - final ReadFailedException cause = new ReadFailedException("fail"); - when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); - actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); - verify(deviceReadTx).read(STORE, PATH); - probe.expectMsg(cause); - } - - @Test - public void testExists() throws Exception { - when(deviceReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true)); - actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); - verify(deviceReadTx).exists(STORE, PATH); - probe.expectMsg(true); + init(mockReadTx, system, TestActorRef.create(system, ReadTransactionActor.props(mockReadTx))); } - @Test - public void testExistsFailure() throws Exception { - final ReadFailedException cause = new ReadFailedException("fail"); - when(deviceReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); - actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); - verify(deviceReadTx).exists(STORE, PATH); - probe.expectMsg(cause); + @AfterClass + public static void staticTearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); } -} \ No newline at end of file +} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTestAdapter.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTestAdapter.java new file mode 100644 index 0000000000..3d7f2b7c5a --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTestAdapter.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl.actors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Status.Failure; +import akka.testkit.TestProbe; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; + +/** + * Adapter for read transaction tests. + * + * @author Thomas Pantelis + */ +public abstract class ReadTransactionActorTestAdapter { + static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; + static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; + static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS); + static final NormalizedNode NODE = Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))).build(); + + private DOMDataReadTransaction mockReadTx; + private TestProbe probe; + private ActorRef actorRef; + + public void init(DOMDataReadTransaction inMockReadTx, ActorSystem system, ActorRef inActorRef) { + this.mockReadTx = inMockReadTx; + this.probe = TestProbe.apply(system); + this.actorRef = inActorRef; + } + + @Test + public void testRead() { + when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(NODE))); + actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); + + verify(mockReadTx).read(STORE, PATH); + final NormalizedNodeMessage response = probe.expectMsgClass(NormalizedNodeMessage.class); + assertEquals(NODE, response.getNode()); + } + + @Test + public void testReadEmpty() { + when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent())); + actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); + + verify(mockReadTx).read(STORE, PATH); + probe.expectMsgClass(EmptyReadResponse.class); + } + + @Test + public void testReadFailure() { + final ReadFailedException cause = new ReadFailedException("fail"); + when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); + actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); + + verify(mockReadTx).read(STORE, PATH); + final Failure response = probe.expectMsgClass(Failure.class); + assertEquals(cause, response.cause()); + } + + @Test + public void testExists() { + when(mockReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true)); + actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); + + verify(mockReadTx).exists(STORE, PATH); + probe.expectMsg(true); + } + + @Test + public void testExistsFailure() { + final ReadFailedException cause = new ReadFailedException("fail"); + when(mockReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); + actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); + + verify(mockReadTx).exists(STORE, PATH); + final Failure response = probe.expectMsgClass(Failure.class); + assertEquals(cause, response.cause()); + } +} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java index 2ccf874449..dc4d81a8f6 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java @@ -8,179 +8,98 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import akka.testkit.TestProbe; -import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; +import akka.testkit.javadsl.TestKit; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Assert; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; public class ReadWriteTransactionActorTest { - - private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; - private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; - private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS); + private static ActorSystem system = ActorSystem.apply(); @Mock - private DOMDataReadWriteTransaction deviceReadWriteTx; - private TestProbe probe; - private ActorSystem system; - private TestActorRef actorRef; - private ContainerNode node; + private DOMDataReadWriteTransaction mockReadWriteTx; + + private final ReadTransactionActorTestAdapter readTestAdapter = new ReadTransactionActorTestAdapter() {}; + private final WriteTransactionActorTestAdapter writeTestAdapter = new WriteTransactionActorTestAdapter() {}; @Before - public void setUp() throws Exception { + public void setUp() { MockitoAnnotations.initMocks(this); - system = ActorSystem.apply(); - probe = TestProbe.apply(system); - node = Builders.containerBuilder() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))) - .build(); - actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(deviceReadWriteTx, - Duration.apply(2, TimeUnit.SECONDS)), "testA"); + TestActorRef actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(mockReadWriteTx, + Duration.apply(2, TimeUnit.SECONDS))); + readTestAdapter.init(mockReadWriteTx, system, actorRef); + writeTestAdapter.init(mockReadWriteTx, system, actorRef); } - @After - public void tearDown() throws Exception { - JavaTestKit.shutdownActorSystem(system, null, true); + @AfterClass + public static void staticTearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); } @Test - public void testRead() throws Exception { - when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node))); - actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); - verify(deviceReadWriteTx).read(STORE, PATH); - probe.expectMsgClass(NormalizedNodeMessage.class); + public void testRead() { + readTestAdapter.testRead(); } @Test - public void testReadEmpty() throws Exception { - when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent())); - actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); - verify(deviceReadWriteTx).read(STORE, PATH); - probe.expectMsgClass(EmptyReadResponse.class); + public void testReadEmpty() { + readTestAdapter.testReadEmpty(); } @Test - public void testReadFailure() throws Exception { - final ReadFailedException cause = new ReadFailedException("fail"); - when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); - actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); - verify(deviceReadWriteTx).read(STORE, PATH); - probe.expectMsg(cause); + public void testReadFailure() { + readTestAdapter.testReadFailure(); } @Test - public void testExists() throws Exception { - when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true)); - actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); - verify(deviceReadWriteTx).exists(STORE, PATH); - probe.expectMsg(true); + public void testExists() { + readTestAdapter.testExists(); } @Test - public void testExistsFailure() throws Exception { - final ReadFailedException cause = new ReadFailedException("fail"); - when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); - actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); - verify(deviceReadWriteTx).exists(STORE, PATH); - probe.expectMsg(cause); + public void testExistsFailure() { + readTestAdapter.testExistsFailure(); } @Test - public void testPut() throws Exception { - final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node); - actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref()); - verify(deviceReadWriteTx).put(STORE, PATH, node); + public void testPut() { + writeTestAdapter.testPut(); } @Test - public void testMerge() throws Exception { - final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node); - actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref()); - verify(deviceReadWriteTx).merge(STORE, PATH, node); + public void testMerge() { + writeTestAdapter.testMerge(); } @Test - public void testDelete() throws Exception { - actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref()); - verify(deviceReadWriteTx).delete(STORE, PATH); + public void testDelete() { + writeTestAdapter.testDelete(); } @Test public void testCancel() throws Exception { - when(deviceReadWriteTx.cancel()).thenReturn(true); - final Future cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT); - final Object result = Await.result(cancelFuture, TIMEOUT.duration()); - Assert.assertTrue(result instanceof Boolean); - verify(deviceReadWriteTx).cancel(); - Assert.assertTrue((Boolean) result); + writeTestAdapter.testCancel(); } @Test public void testSubmit() throws Exception { - when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null)); - final Future submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT); - final Object result = Await.result(submitFuture, TIMEOUT.duration()); - Assert.assertTrue(result instanceof SubmitReply); - verify(deviceReadWriteTx).submit(); + writeTestAdapter.testSubmit(); } @Test public void testSubmitFail() throws Exception { - final RpcError rpcError = - RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail"); - final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError); - when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause)); - final Future submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT); - final Object result = Await.result(submitFuture, TIMEOUT.duration()); - Assert.assertTrue(result instanceof SubmitFailedReply); - Assert.assertEquals(cause, ((SubmitFailedReply)result).getThrowable()); - verify(deviceReadWriteTx).submit(); + writeTestAdapter.testSubmitFail(); } @Test public void testIdleTimeout() throws Exception { - final TestProbe testProbe = new TestProbe(system); - testProbe.watch(actorRef); - verify(deviceReadWriteTx, timeout(3000)).cancel(); - testProbe.expectTerminated(actorRef, TIMEOUT.duration()); + writeTestAdapter.testIdleTimeout(); } } diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java index e21ec46e9c..759c11209c 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java @@ -8,132 +8,32 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import akka.testkit.TestProbe; -import akka.util.Timeout; -import com.google.common.util.concurrent.Futures; +import akka.testkit.javadsl.TestKit; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Assert; +import org.junit.AfterClass; import org.junit.Before; -import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; -public class WriteTransactionActorTest { - private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; - private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; - private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS); +public class WriteTransactionActorTest extends WriteTransactionActorTestAdapter { + private static ActorSystem system = ActorSystem.apply(); @Mock - private DOMDataWriteTransaction deviceWriteTx; - private TestProbe probe; - private ActorSystem system; - private TestActorRef actorRef; - private NormalizedNode node; + private DOMDataWriteTransaction mockWriteTx; @Before - public void setUp() throws Exception { + public void setUp() { MockitoAnnotations.initMocks(this); - system = ActorSystem.apply(); - probe = TestProbe.apply(system); - node = Builders.containerBuilder() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))) - .build(); - actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx, - Duration.apply(2, TimeUnit.SECONDS)), "testA"); - } - - @After - public void tearDown() throws Exception { - JavaTestKit.shutdownActorSystem(system, null, true); - } - - @Test - public void testPut() throws Exception { - final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node); - actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref()); - verify(deviceWriteTx).put(STORE, PATH, node); - } - - @Test - public void testMerge() throws Exception { - final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node); - actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref()); - verify(deviceWriteTx).merge(STORE, PATH, node); - } - - @Test - public void testDelete() throws Exception { - actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref()); - verify(deviceWriteTx).delete(STORE, PATH); - } - - @Test - public void testCancel() throws Exception { - when(deviceWriteTx.cancel()).thenReturn(true); - final Future cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT); - final Object result = Await.result(cancelFuture, TIMEOUT.duration()); - Assert.assertTrue(result instanceof Boolean); - verify(deviceWriteTx).cancel(); - Assert.assertTrue((Boolean) result); - } - - @Test - public void testSubmit() throws Exception { - when(deviceWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null)); - final Future submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT); - final Object result = Await.result(submitFuture, TIMEOUT.duration()); - Assert.assertTrue(result instanceof SubmitReply); - verify(deviceWriteTx).submit(); - } - - @Test - public void testSubmitFail() throws Exception { - final RpcError rpcError = - RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail"); - final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError); - when(deviceWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause)); - final Future submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT); - final Object result = Await.result(submitFuture, TIMEOUT.duration()); - Assert.assertTrue(result instanceof SubmitFailedReply); - Assert.assertEquals(cause, ((SubmitFailedReply)result).getThrowable()); - verify(deviceWriteTx).submit(); + init(mockWriteTx, system, TestActorRef.create(system, + WriteTransactionActor.props(mockWriteTx, Duration.apply(2, TimeUnit.SECONDS)))); } - @Test - public void testIdleTimeout() throws Exception { - final TestProbe testProbe = new TestProbe(system); - testProbe.watch(actorRef); - verify(deviceWriteTx, timeout(3000)).cancel(); - testProbe.expectTerminated(actorRef, TIMEOUT.duration()); + @AfterClass + public static void staticTearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); } - } diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTestAdapter.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTestAdapter.java new file mode 100644 index 0000000000..14fc1d7b42 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTestAdapter.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.singleton.impl.actors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.NODE; +import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.PATH; +import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.STORE; +import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.TIMEOUT; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.testkit.TestProbe; +import com.google.common.util.concurrent.Futures; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; + +/** + * Adapter for write transaction tests. + * + * @author Thomas Pantelis + */ +public abstract class WriteTransactionActorTestAdapter { + private DOMDataWriteTransaction mockWriteTx; + private TestProbe probe; + private ActorRef actorRef; + private ActorSystem system; + + public void init(DOMDataWriteTransaction inMockWriteTx, ActorSystem inSystem, ActorRef inActorRef) { + this.mockWriteTx = inMockWriteTx; + this.probe = TestProbe.apply(inSystem); + this.actorRef = inActorRef; + this.system = inSystem; + } + + @Test + public void testPut() { + final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, NODE); + actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref()); + verify(mockWriteTx).put(STORE, PATH, NODE); + } + + @Test + public void testMerge() { + final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, NODE); + actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref()); + verify(mockWriteTx).merge(STORE, PATH, NODE); + } + + @Test + public void testDelete() { + actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref()); + verify(mockWriteTx).delete(STORE, PATH); + } + + @Test + public void testCancel() throws Exception { + when(mockWriteTx.cancel()).thenReturn(true); + actorRef.tell(new CancelRequest(), probe.ref()); + + verify(mockWriteTx).cancel(); + probe.expectMsg(true); + } + + @Test + public void testSubmit() throws Exception { + when(mockWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null)); + actorRef.tell(new SubmitRequest(), probe.ref()); + + verify(mockWriteTx).submit(); + probe.expectMsgClass(Success.class); + } + + @Test + public void testSubmitFail() throws Exception { + final RpcError rpcError = + RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail"); + final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError); + when(mockWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause)); + actorRef.tell(new SubmitRequest(), probe.ref()); + + verify(mockWriteTx).submit(); + final Failure response = probe.expectMsgClass(Failure.class); + assertEquals(cause, response.cause()); + } + + @Test + public void testIdleTimeout() throws Exception { + final TestProbe testProbe = new TestProbe(system); + testProbe.watch(actorRef); + verify(mockWriteTx, timeout(3000)).cancel(); + testProbe.expectTerminated(actorRef, TIMEOUT.duration()); + } +} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java deleted file mode 100644 index 12adb7c9b1..0000000000 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import akka.testkit.TestProbe; -import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.netconf.api.DocumentedException; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; - -public class ProxyReadTransactionTest { - private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; - private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; - - private ActorSystem system; - private TestProbe masterActor; - private ContainerNode node; - private ProxyReadTransaction tx; - - @Before - public void setUp() throws Exception { - system = ActorSystem.apply(); - masterActor = new TestProbe(system); - final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830)); - node = Builders.containerBuilder() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))) - .build(); - tx = new ProxyReadTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS)); - } - - @After - public void tearDown() throws Exception { - JavaTestKit.shutdownActorSystem(system, null, true); - } - - @Test - public void testRead() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); - masterActor.expectMsgClass(ReadRequest.class); - masterActor.reply(new NormalizedNodeMessage(PATH, node)); - final Optional> result = read.checkedGet(); - Assert.assertTrue(result.isPresent()); - Assert.assertEquals(node, result.get()); - } - - @Test - public void testReadEmpty() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); - masterActor.expectMsgClass(ReadRequest.class); - masterActor.reply(new EmptyReadResponse()); - final Optional> result = read.checkedGet(); - Assert.assertFalse(result.isPresent()); - } - - @Test(expected = ReadFailedException.class) - public void testReadFail() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); - masterActor.expectMsgClass(ReadRequest.class); - masterActor.reply(new RuntimeException("fail")); - read.checkedGet(); - } - - @Test - public void testExists() throws Exception { - final CheckedFuture read = tx.exists(STORE, PATH); - masterActor.expectMsgClass(ExistsRequest.class); - masterActor.reply(true); - final Boolean result = read.checkedGet(); - Assert.assertTrue(result); - } - - @Test(expected = ReadFailedException.class) - public void testExistsFail() throws Exception { - final CheckedFuture read = tx.exists(STORE, PATH); - masterActor.expectMsgClass(ExistsRequest.class); - masterActor.reply(new RuntimeException("fail")); - read.checkedGet(); - } - - @Test - public void testMasterDownRead() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); - masterActor.expectMsgClass(ReadRequest.class); - //master doesn't reply - try { - read.checkedGet(); - Assert.fail("Exception should be thrown"); - } catch (final ReadFailedException e) { - final Throwable cause = e.getCause(); - Assert.assertTrue(cause instanceof DocumentedException); - final DocumentedException de = (DocumentedException) cause; - Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity()); - Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag()); - Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType()); - } - } - -} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java index 54b887cbb8..434be24a6d 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java @@ -8,18 +8,26 @@ package org.opendaylight.netconf.topology.singleton.impl.tx; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.dispatch.Futures; +import akka.pattern.AskTimeoutException; import akka.testkit.TestProbe; +import akka.testkit.javadsl.TestKit; import akka.util.Timeout; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; import java.net.InetSocketAddress; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Assert; +import java.util.concurrent.TimeoutException; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; @@ -35,202 +43,366 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsR import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import scala.concurrent.Promise; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; public class ProxyReadWriteTransactionTest { + private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS); + private static final RemoteDeviceId DEVICE_ID = + new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830)); private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; - private ActorSystem system; + private static ActorSystem system = ActorSystem.apply(); private TestProbe masterActor; private ContainerNode node; - private ProxyReadWriteTransaction tx; @Before - public void setUp() throws Exception { - system = ActorSystem.apply(); + public void setUp() { masterActor = new TestProbe(system); - final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830)); node = Builders.containerBuilder() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))) .build(); - tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS)); } - @After - public void tearDown() throws Exception { - JavaTestKit.shutdownActorSystem(system, null, true); + @AfterClass + public static void staticTearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); + } + + private ProxyReadWriteTransaction newSuccessfulProxyTx() { + return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS)); + } + + private ProxyReadWriteTransaction newSuccessfulProxyTx(Timeout timeout) { + return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()), + system.dispatcher(), timeout); } @Test - public void testCancel() throws Exception { - final Future submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel()); + public void testCancel() { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + tx.cancel(); masterActor.expectMsgClass(CancelRequest.class); - masterActor.reply(true); - Assert.assertTrue(submit.get()); + masterActor.reply(Boolean.TRUE); } @Test - public void testCancelSubmitted() throws Exception { - final CheckedFuture submitFuture = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submitFuture.checkedGet(); - final Future submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel()); - masterActor.expectNoMsg(); - Assert.assertFalse(submit.get()); + public void testCommit() throws InterruptedException, ExecutionException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + commit(tx); } @Test - public void testSubmit() throws Exception { - final CheckedFuture submitFuture = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submitFuture.checkedGet(); + public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + commit(tx); + assertFalse(tx.cancel()); } @Test - public void testDoubleSubmit() throws Exception { - final CheckedFuture submitFuture = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submitFuture.checkedGet(); + public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + commit(tx); try { - tx.submit().checkedGet(); - Assert.fail("Should throw IllegalStateException"); + tx.commit(); + fail("Should throw IllegalStateException"); } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); + masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT); } } @Test - public void testDelete() throws Exception { + public void testDelete() { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + tx.delete(STORE, PATH); - masterActor.expectMsgClass(DeleteRequest.class); + final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class); + assertEquals(STORE, deleteRequest.getStore()); + assertEquals(PATH, deleteRequest.getPath()); } @Test - public void testDeleteClosed() throws Exception { - submit(); + public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + commit(tx); try { tx.delete(STORE, PATH); - Assert.fail("Should throw IllegalStateException"); + fail("Should throw IllegalStateException"); } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); + masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT); } } @Test - public void testPut() throws Exception { + public void testPut() { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + tx.put(STORE, PATH, node); - masterActor.expectMsgClass(PutRequest.class); + final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class); + assertEquals(STORE, putRequest.getStore()); + assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier()); + assertEquals(node, putRequest.getNormalizedNodeMessage().getNode()); } @Test - public void testPutClosed() throws Exception { - submit(); + public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + commit(tx); try { tx.put(STORE, PATH, node); - Assert.fail("Should throw IllegalStateException"); + fail("Should throw IllegalStateException"); } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); + masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT); } } @Test - public void testMerge() throws Exception { + public void testMerge() { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + tx.merge(STORE, PATH, node); - masterActor.expectMsgClass(MergeRequest.class); + final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class); + assertEquals(STORE, mergeRequest.getStore()); + assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier()); + assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode()); } @Test - public void testMergeClosed() throws Exception { - submit(); + public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + commit(tx); try { tx.merge(STORE, PATH, node); - Assert.fail("Should throw IllegalStateException"); + fail("Should throw IllegalStateException"); } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); + masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT); } } - @Test - public void testGetIdentifier() throws Exception { - Assert.assertEquals(tx, tx.getIdentifier()); - } - - private void submit() throws TransactionCommitFailedException { - final CheckedFuture submit = tx.submit(); + private void commit(ProxyReadWriteTransaction tx) + throws InterruptedException, ExecutionException, TimeoutException { + final ListenableFuture submit = tx.commit(); masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submit.checkedGet(); + masterActor.reply(new Success(null)); + submit.get(5, TimeUnit.SECONDS); } @Test public void testRead() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); - masterActor.expectMsgClass(ReadRequest.class); + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + final ListenableFuture>> read = tx.read(STORE, PATH); + final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class); + assertEquals(STORE, readRequest.getStore()); + assertEquals(PATH, readRequest.getPath()); + masterActor.reply(new NormalizedNodeMessage(PATH, node)); - final Optional> result = read.checkedGet(); - Assert.assertTrue(result.isPresent()); - Assert.assertEquals(node, result.get()); + final Optional> result = read.get(5, TimeUnit.SECONDS); + assertTrue(result.isPresent()); + assertEquals(node, result.get()); } @Test public void testReadEmpty() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + final ListenableFuture>> read = tx.read(STORE, PATH); masterActor.expectMsgClass(ReadRequest.class); masterActor.reply(new EmptyReadResponse()); - final Optional> result = read.checkedGet(); - Assert.assertFalse(result.isPresent()); + final Optional> result = read.get(5, TimeUnit.SECONDS); + assertFalse(result.isPresent()); } - @Test(expected = ReadFailedException.class) - public void testReadFail() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + @Test + public void testReadFailure() throws InterruptedException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + final ListenableFuture>> read = tx.read(STORE, PATH); masterActor.expectMsgClass(ReadRequest.class); - masterActor.reply(new RuntimeException("fail")); - read.checkedGet(); + final RuntimeException mockEx = new RuntimeException("fail"); + masterActor.reply(new Failure(mockEx)); + + try { + read.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException); + assertEquals(mockEx, cause.getCause()); + } } @Test public void testExists() throws Exception { - final CheckedFuture read = tx.exists(STORE, PATH); + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + final ListenableFuture read = tx.exists(STORE, PATH); + final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class); + assertEquals(STORE, existsRequest.getStore()); + assertEquals(PATH, existsRequest.getPath()); + + masterActor.reply(Boolean.TRUE); + final Boolean result = read.get(5, TimeUnit.SECONDS); + assertTrue(result); + } + + @Test + public void testExistsFailure() throws InterruptedException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(); + + final ListenableFuture read = tx.exists(STORE, PATH); masterActor.expectMsgClass(ExistsRequest.class); - masterActor.reply(true); - final Boolean result = read.checkedGet(); - Assert.assertTrue(result); + final RuntimeException mockEx = new RuntimeException("fail"); + masterActor.reply(new Failure(mockEx)); + + try { + read.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException); + assertEquals(mockEx, cause.getCause()); + } } - @Test(expected = ReadFailedException.class) - public void testExistsFail() throws Exception { - final CheckedFuture read = tx.exists(STORE, PATH); + @Test + public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException { + ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS)); + + ListenableFuture future = tx.read(STORE, PATH); + masterActor.expectMsgClass(ReadRequest.class); + + // master doesn't reply + try { + future.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException); + verifyDocumentedException(cause.getCause()); + } + + future = tx.exists(STORE, PATH); masterActor.expectMsgClass(ExistsRequest.class); - masterActor.reply(new RuntimeException("fail")); - read.checkedGet(); + + // master doesn't reply + try { + future.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException); + verifyDocumentedException(cause.getCause()); + } + + future = tx.commit(); + masterActor.expectMsgClass(SubmitRequest.class); + + // master doesn't reply + try { + future.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException); + verifyDocumentedException(cause.getCause()); + } } @Test - public void testMasterDownRead() throws Exception { - final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException { + final Promise promise = Futures.promise(); + ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(), + system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS)); + + final ListenableFuture>> read = tx.read(STORE, PATH); + final ListenableFuture exists = tx.exists(STORE, PATH); + + tx.put(STORE, PATH, node); + tx.merge(STORE, PATH, node); + tx.delete(STORE, PATH); + + final ListenableFuture commit = tx.commit(); + + promise.success(masterActor.ref()); + masterActor.expectMsgClass(ReadRequest.class); - //master doesn't reply + masterActor.reply(new NormalizedNodeMessage(PATH, node)); + + masterActor.expectMsgClass(ExistsRequest.class); + masterActor.reply(Boolean.TRUE); + + masterActor.expectMsgClass(PutRequest.class); + masterActor.expectMsgClass(MergeRequest.class); + masterActor.expectMsgClass(DeleteRequest.class); + + masterActor.expectMsgClass(SubmitRequest.class); + masterActor.reply(new Success(null)); + + read.get(5, TimeUnit.SECONDS).isPresent(); + assertTrue(exists.get(5, TimeUnit.SECONDS)); + commit.get(5, TimeUnit.SECONDS); + } + + @Test + public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException { + final AskTimeoutException mockEx = new AskTimeoutException("mock"); + ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx), + system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS)); + + ListenableFuture future = tx.read(STORE, PATH); + try { + future.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException); + assertEquals(mockEx, cause.getCause()); + } + + future = tx.exists(STORE, PATH); try { - read.checkedGet(); - Assert.fail("Exception should be thrown"); - } catch (final ReadFailedException e) { - final Throwable cause = e.getCause(); - Assert.assertTrue(cause instanceof DocumentedException); - final DocumentedException de = (DocumentedException) cause; - Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity()); - Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag()); - Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType()); + future.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException); + assertEquals(mockEx, cause.getCause()); } + + tx.put(STORE, PATH, node); + tx.merge(STORE, PATH, node); + tx.delete(STORE, PATH); + + future = tx.commit(); + try { + future.get(5, TimeUnit.SECONDS); + fail("Exception should be thrown"); + } catch (final ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException); + assertEquals(mockEx, cause.getCause()); + } + } + + private void verifyDocumentedException(Throwable cause) { + assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException); + final DocumentedException de = (DocumentedException) cause; + assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity()); + assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag()); + assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType()); } } diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java deleted file mode 100644 index 159bdc1a7e..0000000000 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import akka.testkit.TestProbe; -import akka.util.Timeout; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.ListenableFuture; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; - -public class ProxyWriteTransactionTest { - private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; - private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; - - private ActorSystem system; - private TestProbe masterActor; - private ContainerNode node; - private ProxyWriteTransaction tx; - - @Before - public void setUp() throws Exception { - system = ActorSystem.apply(); - masterActor = new TestProbe(system); - final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830)); - node = Builders.containerBuilder() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))) - .build(); - tx = new ProxyWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS)); - } - - @After - public void tearDown() throws Exception { - JavaTestKit.shutdownActorSystem(system, null, true); - } - - @Test - public void testCancel() throws Exception { - final Future submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel()); - masterActor.expectMsgClass(CancelRequest.class); - masterActor.reply(true); - Assert.assertTrue(submit.get()); - } - - @Test - public void testCancelSubmitted() throws Exception { - final ListenableFuture submitFuture = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submitFuture.get(); - final Future submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel()); - masterActor.expectNoMsg(); - Assert.assertFalse(submit.get()); - } - - @Test - public void testSubmit() throws Exception { - final ListenableFuture submitFuture = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submitFuture.get(); - } - - @Test - public void testDoubleSubmit() throws Exception { - final ListenableFuture submitFuture = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submitFuture.get(); - try { - tx.submit().checkedGet(); - Assert.fail("Should throw IllegalStateException"); - } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); - } - } - - @Test - public void testDelete() throws Exception { - tx.delete(STORE, PATH); - masterActor.expectMsgClass(DeleteRequest.class); - } - - @Test - public void testDeleteClosed() throws Exception { - submit(); - try { - tx.delete(STORE, PATH); - Assert.fail("Should throw IllegalStateException"); - } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); - } - } - - @Test - public void testPut() throws Exception { - tx.put(STORE, PATH, node); - masterActor.expectMsgClass(PutRequest.class); - } - - @Test - public void testPutClosed() throws Exception { - submit(); - try { - tx.put(STORE, PATH, node); - Assert.fail("Should throw IllegalStateException"); - } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); - } - } - - @Test - public void testMerge() throws Exception { - tx.merge(STORE, PATH, node); - masterActor.expectMsgClass(MergeRequest.class); - } - - @Test - public void testMergeClosed() throws Exception { - submit(); - try { - tx.merge(STORE, PATH, node); - Assert.fail("Should throw IllegalStateException"); - } catch (final IllegalStateException e) { - masterActor.expectNoMsg(); - } - } - - @Test - public void testGetIdentifier() throws Exception { - Assert.assertEquals(tx, tx.getIdentifier()); - } - - private void submit() throws TransactionCommitFailedException { - final CheckedFuture submit = tx.submit(); - masterActor.expectMsgClass(SubmitRequest.class); - masterActor.reply(new SubmitReply()); - submit.checkedGet(); - } - -} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java deleted file mode 100644 index 825a812f03..0000000000 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.MockitoAnnotations.initMocks; -import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.net.InetAddresses; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker; -import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; -import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; -import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; - -public class ReadOnlyTransactionTest { - private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); - private static final int TIMEOUT_SEC = 5; - private static ActorSystem system; - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - private ActorRef masterRef; - private ProxyDOMDataBroker slaveDataBroker; - private List sourceIdentifiers; - private YangInstanceIdentifier instanceIdentifier; - private LogicalDatastoreType storeType; - @Mock - private DOMDataBroker deviceDataBroker; - @Mock - private DOMDataReadOnlyTransaction readTx; - @Mock - private DOMRpcService domRpcService; - @Mock - private DOMMountPointService mountPointService; - - - @Before - public void setup() throws Exception { - initMocks(this); - - system = ActorSystem.create(); - - final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology", - new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999)); - - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); - final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, - DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService); - - masterRef = TestActorRef.create(system, props, "master_read"); - - sourceIdentifiers = Lists.newArrayList(); - - //device read tx - doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction(); - - // Create slave data broker for testing proxy - slaveDataBroker = - new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS)); - initializeDataTest(); - instanceIdentifier = YangInstanceIdentifier.EMPTY; - storeType = LogicalDatastoreType.CONFIGURATION; - } - - @After - public void teardown() { - JavaTestKit.shutdownActorSystem(system, null, true); - system = null; - } - - @Test - public void testRead() throws Exception { - // Message: NormalizedNodeMessage - final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname"))) - .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build(); - final CheckedFuture>, ReadFailedException> resultNormalizedNodeMessage = - Futures.immediateCheckedFuture(Optional.of(outputNode)); - doReturn(resultNormalizedNodeMessage).when(readTx).read(storeType, instanceIdentifier); - - final CheckedFuture>, ReadFailedException> resultNodeMessageResponse = - slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier); - - final Optional> resultNodeMessage = - resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertTrue(resultNodeMessage.isPresent()); - assertEquals(resultNodeMessage.get(), outputNode); - } - - @Test - public void testReadEmpty() throws Exception { - // Message: EmptyReadResponse - final CheckedFuture>, ReadFailedException> resultEmpty = - Futures.immediateCheckedFuture(Optional.absent()); - doReturn(resultEmpty).when(readTx).read(storeType, instanceIdentifier); - - final CheckedFuture>, ReadFailedException> resultEmptyResponse = - slaveDataBroker.newReadOnlyTransaction().read(storeType, - instanceIdentifier); - - final Optional> resultEmptyMessage = - resultEmptyResponse.get(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(resultEmptyMessage, Optional.absent()); - } - - @Test - public void testReadFail() throws Exception { - // Message: Throwable - final ReadFailedException readFailedException = new ReadFailedException("Fail", null); - final CheckedFuture>, ReadFailedException> resultThrowable = - Futures.immediateFailedCheckedFuture(readFailedException); - - doReturn(resultThrowable).when(readTx).read(storeType, instanceIdentifier); - - final CheckedFuture>, ReadFailedException> resultThrowableResponse = - slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier); - - exception.expect(ReadFailedException.class); - resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - } - - @Test - public void testExist() throws Exception { - // Message: True - final CheckedFuture resultTrue = - Futures.immediateCheckedFuture(true); - doReturn(resultTrue).when(readTx).exists(storeType, instanceIdentifier); - - final CheckedFuture trueResponse = - slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier); - - final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(true, trueMessage); - } - - @Test - public void testExistsNull() throws Exception { - // Message: False, result null - final CheckedFuture resultNull = Futures.immediateCheckedFuture(null); - doReturn(resultNull).when(readTx).exists(storeType, instanceIdentifier); - - final CheckedFuture nullResponse = - slaveDataBroker.newReadOnlyTransaction().exists(storeType, - instanceIdentifier); - - final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(false, nullFalseMessage); - } - - @Test - public void testExistsFalse() throws Exception { - // Message: False - final CheckedFuture resultFalse = Futures.immediateCheckedFuture(false); - doReturn(resultFalse).when(readTx).exists(storeType, instanceIdentifier); - - final CheckedFuture falseResponse = - slaveDataBroker.newReadOnlyTransaction().exists(storeType, - instanceIdentifier); - - final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(false, falseMessage); - } - - @Test - public void testExistsFail() throws Exception { - // Message: Throwable - final ReadFailedException readFailedException = new ReadFailedException("Fail", null); - final CheckedFuture resultThrowable = - Futures.immediateFailedCheckedFuture(readFailedException); - doReturn(resultThrowable).when(readTx).exists(storeType, instanceIdentifier); - - final CheckedFuture resultThrowableResponse = - slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier); - - exception.expect(ReadFailedException.class); - resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - } - - private void initializeDataTest() throws Exception { - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); - - final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); - - assertTrue(success instanceof MasterActorDataInitialized); - } -} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java deleted file mode 100644 index 441a4caaa3..0000000000 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import static junit.framework.TestCase.assertNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.MockitoAnnotations.initMocks; -import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker; -import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; -import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; -import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; - -public class ReadWriteTransactionTest { - private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); - private static final int TIMEOUT_SEC = 5; - private static ActorSystem system; - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @Mock - private DOMDataBroker deviceDataBroker; - @Mock - private DOMDataReadWriteTransaction readWriteTx; - @Mock - private DOMRpcService domRpcService; - @Mock - private DOMMountPointService mountPointService; - private ActorRef masterRef; - private ProxyDOMDataBroker slaveDataBroker; - private List sourceIdentifiers; - private NormalizedNode testNode; - private YangInstanceIdentifier instanceIdentifier; - private LogicalDatastoreType storeType; - - @Before - public void setup() throws Exception { - initMocks(this); - - system = ActorSystem.create(); - - final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology", - new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999)); - - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); - doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout(); - final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, - DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService); - - masterRef = TestActorRef.create(system, props, "master_read"); - - sourceIdentifiers = Lists.newArrayList(); - - doReturn(readWriteTx).when(deviceDataBroker).newReadWriteTransaction(); - doNothing().when(readWriteTx).put(storeType, instanceIdentifier, testNode); - doNothing().when(readWriteTx).merge(storeType, instanceIdentifier, testNode); - doNothing().when(readWriteTx).delete(storeType, instanceIdentifier); - - // Create slave data broker for testing proxy - slaveDataBroker = - new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS)); - initializeDataTest(); - testNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname"))) - .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build(); - instanceIdentifier = YangInstanceIdentifier.EMPTY; - storeType = LogicalDatastoreType.CONFIGURATION; - } - - @After - public void teardown() { - JavaTestKit.shutdownActorSystem(system, null, true); - system = null; - } - - @Test - public void testPut() throws Exception { - // Test of invoking put on master through slave proxy - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - wTx.put(storeType, instanceIdentifier, testNode); - - verify(readWriteTx, timeout(2000)).put(storeType, instanceIdentifier, testNode); - - wTx.cancel(); - } - - @Test - public void testMerge() throws Exception { - // Test of invoking merge on master through slave proxy - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - wTx.merge(storeType, instanceIdentifier, testNode); - - verify(readWriteTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode); - - wTx.cancel(); - } - - @Test - public void testDelete() throws Exception { - // Test of invoking delete on master through slave proxy - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); - wTx.cancel(); - - verify(readWriteTx, timeout(2000)).delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); - } - - @Test - public void testSubmit() throws Exception { - final CheckedFuture resultSubmit = Futures.immediateCheckedFuture(null); - doReturn(resultSubmit).when(readWriteTx).submit(); - - // Without Tx - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - - final CheckedFuture resultSubmitResponse = wTx.submit(); - - final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertNull(result); - } - - @Test - public void testSubmitWithOperation() throws Exception { - final CheckedFuture resultSubmitTx = - Futures.immediateCheckedFuture(null); - doReturn(resultSubmitTx).when(readWriteTx).submit(); - // With Tx - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.EMPTY); - - final CheckedFuture resultSubmitTxResponse = wTx.submit(); - - final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertNull(resultTx); - } - - @Test - public void testSubmitFail() throws Exception { - final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null); - final CheckedFuture resultThrowable = - Futures.immediateFailedCheckedFuture(throwable); - doReturn(resultThrowable).when(readWriteTx).submit(); - - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.EMPTY); - final CheckedFuture resultThrowableResponse = - wTx.submit(); - exception.expect(TransactionCommitFailedException.class); - resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - } - - @Test - public void testCancel() throws Exception { - doReturn(true).when(readWriteTx).cancel(); - - // Without Tx - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - final Boolean resultFalseNoTx = wTx.cancel(); - assertEquals(true, resultFalseNoTx); - } - - @Test - public void testCancelWithOperation() throws Exception { - doReturn(true).when(readWriteTx).cancel(); - - // With Tx, readWriteTx test - final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.EMPTY); - - final Boolean resultTrue = wTx.cancel(); - assertEquals(true, resultTrue); - - final Boolean resultFalse = wTx.cancel(); - assertEquals(false, resultFalse); - } - - @Test - public void testRead() throws Exception { - // Message: NormalizedNodeMessage - final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname"))) - .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build(); - final CheckedFuture>, ReadFailedException> resultNormalizedNodeMessage = - Futures.immediateCheckedFuture(Optional.of(outputNode)); - doReturn(resultNormalizedNodeMessage).when(readWriteTx).read(storeType, instanceIdentifier); - - final CheckedFuture>, ReadFailedException> resultNodeMessageResponse = - slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier); - - final Optional> resultNodeMessage = - resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertTrue(resultNodeMessage.isPresent()); - assertEquals(resultNodeMessage.get(), outputNode); - } - - @Test - public void testReadEmpty() throws Exception { - // Message: EmptyReadResponse - final CheckedFuture>, ReadFailedException> resultEmpty = - Futures.immediateCheckedFuture(Optional.absent()); - doReturn(resultEmpty).when(readWriteTx).read(storeType, instanceIdentifier); - - final CheckedFuture>, ReadFailedException> resultEmptyResponse = - slaveDataBroker.newReadWriteTransaction().read(storeType, - instanceIdentifier); - - final Optional> resultEmptyMessage = - resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(resultEmptyMessage, Optional.absent()); - } - - @Test - public void testReadFail() throws Exception { - // Message: Throwable - final ReadFailedException readFailedException = new ReadFailedException("Fail", null); - final CheckedFuture>, ReadFailedException> resultThrowable = - Futures.immediateFailedCheckedFuture(readFailedException); - - doReturn(resultThrowable).when(readWriteTx).read(storeType, instanceIdentifier); - - final CheckedFuture>, ReadFailedException> resultThrowableResponse = - slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier); - - exception.expect(ReadFailedException.class); - resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - } - - @Test - public void testExist() throws Exception { - // Message: True - final CheckedFuture resultTrue = - Futures.immediateCheckedFuture(true); - doReturn(resultTrue).when(readWriteTx).exists(storeType, instanceIdentifier); - - final CheckedFuture trueResponse = - slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier); - - final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(true, trueMessage); - } - - @Test - public void testExistsNull() throws Exception { - // Message: False, result null - final CheckedFuture resultNull = Futures.immediateCheckedFuture(null); - doReturn(resultNull).when(readWriteTx).exists(storeType, instanceIdentifier); - - final CheckedFuture nullResponse = - slaveDataBroker.newReadWriteTransaction().exists(storeType, - instanceIdentifier); - - final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(false, nullFalseMessage); - } - - @Test - public void testExistsFalse() throws Exception { - // Message: False - final CheckedFuture resultFalse = Futures.immediateCheckedFuture(false); - doReturn(resultFalse).when(readWriteTx).exists(storeType, instanceIdentifier); - - final CheckedFuture falseResponse = - slaveDataBroker.newReadWriteTransaction().exists(storeType, - instanceIdentifier); - - final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertEquals(false, falseMessage); - } - - @Test - public void testExistsFail() throws Exception { - // Message: Throwable - final ReadFailedException readFailedException = new ReadFailedException("Fail", null); - final CheckedFuture resultThrowable = - Futures.immediateFailedCheckedFuture(readFailedException); - doReturn(resultThrowable).when(readWriteTx).exists(storeType, instanceIdentifier); - - final CheckedFuture resultThrowableResponse = - slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier); - - exception.expect(ReadFailedException.class); - resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - } - - private void initializeDataTest() throws Exception { - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); - - final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); - - assertTrue(success instanceof MasterActorDataInitialized); - } - -} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java deleted file mode 100644 index a13c953625..0000000000 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.netconf.topology.singleton.impl.tx; - -import static junit.framework.TestCase.assertNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.MockitoAnnotations.initMocks; -import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.collect.Lists; -import com.google.common.net.InetAddresses; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker; -import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; -import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; -import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; - -public class WriteOnlyTransactionTest { - private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); - private static final int TIMEOUT_SEC = 5; - private static ActorSystem system; - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @Mock - private DOMDataBroker deviceDataBroker; - @Mock - private DOMDataWriteTransaction writeTx; - @Mock - private DOMRpcService domRpcService; - @Mock - private DOMMountPointService mountPointService; - private ActorRef masterRef; - private ProxyDOMDataBroker slaveDataBroker; - private List sourceIdentifiers; - private NormalizedNode testNode; - private YangInstanceIdentifier instanceIdentifier; - private LogicalDatastoreType storeType; - - @Before - public void setup() throws Exception { - initMocks(this); - - system = ActorSystem.create(); - - final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology", - new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999)); - - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); - doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout(); - final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, - DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService); - - masterRef = TestActorRef.create(system, props, "master_read"); - - sourceIdentifiers = Lists.newArrayList(); - - final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class); - - doReturn(writeTx).when(deviceDataBroker).newWriteOnlyTransaction(); - doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction(); - doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode); - doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode); - doNothing().when(writeTx).delete(storeType, instanceIdentifier); - - // Create slave data broker for testing proxy - slaveDataBroker = - new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS)); - initializeDataTest(); - testNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname"))) - .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build(); - instanceIdentifier = YangInstanceIdentifier.EMPTY; - storeType = LogicalDatastoreType.CONFIGURATION; - } - - @After - public void teardown() { - JavaTestKit.shutdownActorSystem(system, null, true); - system = null; - } - - @Test - public void testPut() throws Exception { - // Test of invoking put on master through slave proxy - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - wTx.put(storeType, instanceIdentifier, testNode); - - verify(writeTx, timeout(2000)).put(storeType, instanceIdentifier, testNode); - - wTx.cancel(); - } - - @Test - public void testMerge() throws Exception { - // Test of invoking merge on master through slave proxy - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - wTx.merge(storeType, instanceIdentifier, testNode); - - verify(writeTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode); - - wTx.cancel(); - } - - @Test - public void testDelete() throws Exception { - // Test of invoking delete on master through slave proxy - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); - wTx.cancel(); - - verify(writeTx, timeout(2000)).delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); - } - - @Test - public void testSubmit() throws Exception { - final CheckedFuture resultSubmit = Futures.immediateCheckedFuture(null); - doReturn(resultSubmit).when(writeTx).submit(); - - // Without Tx - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - - final CheckedFuture resultSubmitResponse = wTx.submit(); - - final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertNull(result); - } - - @Test - public void testSubmitWithOperation() throws Exception { - final CheckedFuture resultSubmitTx = - Futures.immediateCheckedFuture(null); - doReturn(resultSubmitTx).when(writeTx).submit(); - // With Tx - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.EMPTY); - - final CheckedFuture resultSubmitTxResponse = wTx.submit(); - - final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - - assertNull(resultTx); - } - - @Test - public void testSubmitFail() throws Exception { - final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null); - final CheckedFuture resultThrowable = - Futures.immediateFailedCheckedFuture(throwable); - doReturn(resultThrowable).when(writeTx).submit(); - - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.EMPTY); - final CheckedFuture resultThrowableResponse = - wTx.submit(); - exception.expect(TransactionCommitFailedException.class); - resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); - } - - @Test - public void testCancel() throws Exception { - doReturn(true).when(writeTx).cancel(); - - // Without Tx - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - final Boolean resultFalseNoTx = wTx.cancel(); - assertEquals(true, resultFalseNoTx); - } - - @Test - public void testCancelWithOperation() throws Exception { - doReturn(true).when(writeTx).cancel(); - - // With Tx, readWriteTx test - final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction(); - wTx.delete(LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.EMPTY); - - final Boolean resultTrue = wTx.cancel(); - assertEquals(true, resultTrue); - - final Boolean resultFalse = wTx.cancel(); - assertEquals(false, resultFalse); - } - - private void initializeDataTest() throws Exception { - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); - - final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); - - assertTrue(success instanceof MasterActorDataInitialized); - } - -}