From 3e2c25cc572345d62ea6df81df4628f20306007b Mon Sep 17 00:00:00 2001 From: "miroslav.kovac" Date: Fri, 28 Apr 2017 10:17:44 +0200 Subject: [PATCH] Bug 8289 - 409 in cluster restperfclient test To ensure message ordering we need ReadWriteTransactionActor. This way we are sure that all reads and writes will be executed in the order that they are sent. Change-Id: I7a76f6b4d9e6e348ec0d58abe24e56dfeae66f24 Signed-off-by: miroslav.kovac --- .../singleton/impl/ProxyDOMDataBroker.java | 19 +- .../impl/actors/NetconfNodeActor.java | 11 + .../singleton/impl/actors/ReadAdapter.java | 91 +++++ .../impl/actors/ReadTransactionActor.java | 80 +--- .../actors/ReadWriteTransactionActor.java | 67 ++++ .../singleton/impl/actors/WriteAdapter.java | 74 ++++ .../impl/actors/WriteTransactionActor.java | 71 +--- .../singleton/impl/tx/ProxyReadAdapter.java | 112 ++++++ .../impl/tx/ProxyReadTransaction.java | 78 +--- .../impl/tx/ProxyReadWriteTransaction.java | 96 +++++ .../singleton/impl/tx/ProxyWriteAdapter.java | 154 ++++++++ .../impl/tx/ProxyWriteTransaction.java | 111 +----- .../messages/transactions/CancelRequest.java | 2 +- .../messages/transactions/DeleteRequest.java | 2 +- .../messages/transactions/ExistsRequest.java | 2 +- .../messages/transactions/MergeRequest.java | 2 +- .../NewReadWriteTransactionReply.java | 25 ++ .../NewReadWriteTransactionRequest.java | 15 + .../messages/transactions/PutRequest.java | 2 +- .../transactions/ReadActorMessage.java | 12 + .../messages/transactions/ReadRequest.java | 2 +- .../messages/transactions/SubmitRequest.java | 2 +- .../transactions/WriteActorMessage.java | 12 + .../actors/ReadWriteTransactionActorTest.java | 189 +++++++++ .../tx/ProxyReadWriteTransactionTest.java | 247 ++++++++++++ .../impl/tx/ReadWriteTransactionTest.java | 361 ++++++++++++++++++ 26 files changed, 1524 insertions(+), 315 deletions(-) create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java create mode 100644 netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java create mode 100644 netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java create mode 100644 netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java index 416d9bdfd2..d5c9c25a17 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java @@ -25,12 +25,14 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; -import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction; +import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadWriteTransaction; import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -49,7 +51,7 @@ public class ProxyDOMDataBroker implements DOMDataBroker { * @param actorSystem system * @param id id * @param masterNode {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref - * @param askTimeout ask timeout + * @param askTimeout ask timeout */ public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id, final ActorRef masterNode, final Timeout askTimeout) { @@ -77,7 +79,18 @@ public class ProxyDOMDataBroker implements DOMDataBroker { @Override public DOMDataReadWriteTransaction newReadWriteTransaction() { - return new ReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction()); + final Future txActorFuture = Patterns.ask(masterNode, new NewReadWriteTransactionRequest(), askTimeout); + try { + final Object msg = Await.result(txActorFuture, askTimeout.duration()); + if (msg instanceof Throwable) { + throw (Throwable) msg; + } + Preconditions.checkState(msg instanceof NewReadWriteTransactionReply); + final NewReadWriteTransactionReply reply = (NewReadWriteTransactionReply) msg; + return new ProxyReadWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout); + } catch (final Throwable t) { + throw new IllegalStateException("Can't create ProxyReadTransaction", t); + } } @Override diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index f04430f666..dd5e912927 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProv import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; @@ -48,6 +49,8 @@ import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -152,6 +155,14 @@ public class NetconfNodeActor extends UntypedActor { sender().tell(t, self()); } + } else if (message instanceof NewReadWriteTransactionRequest) { + try { + final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); + final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout)); + sender().tell(new NewReadWriteTransactionReply(txActor), self()); + } catch (final Throwable t) { + sender().tell(t, self()); + } } else if (message instanceof InvokeRpcMessage) { // master final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message); diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java new file mode 100644 index 0000000000..0ef08f0d2e --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.actors; + +import akka.actor.ActorRef; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +class ReadAdapter { + + private final DOMDataReadTransaction tx; + + public ReadAdapter(final DOMDataReadTransaction tx) { + this.tx = tx; + } + + public void handle(final Object message, final ActorRef sender, final ActorRef self) throws Throwable { + if (message instanceof ReadRequest) { + + final ReadRequest readRequest = (ReadRequest) message; + final YangInstanceIdentifier path = readRequest.getPath(); + final LogicalDatastoreType store = readRequest.getStore(); + read(path, store, sender, self); + + } else if (message instanceof ExistsRequest) { + final ExistsRequest readRequest = (ExistsRequest) message; + final YangInstanceIdentifier path = readRequest.getPath(); + final LogicalDatastoreType store = readRequest.getStore(); + exists(path, store, sender, self); + } + } + + private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender, + final ActorRef self) { + final CheckedFuture>, ReadFailedException> read = tx.read(store, path); + Futures.addCallback(read, new FutureCallback>>() { + + @Override + public void onSuccess(final Optional> result) { + if (!result.isPresent()) { + sender.tell(new EmptyReadResponse(), self); + return; + } + sender.tell(new NormalizedNodeMessage(path, result.get()), self); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + sender.tell(throwable, self); + } + }); + } + + private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender, + final ActorRef self) { + final CheckedFuture readFuture = tx.exists(store, path); + Futures.addCallback(readFuture, new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + if (result == null) { + sender.tell(false, self); + } else { + sender.tell(result, self); + } + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + sender.tell(throwable, self); + } + }); + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java index b6bf651ea2..777140cd4f 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java @@ -8,30 +8,21 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; -import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import javax.annotation.Nonnull; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadActorMessage; /** * ReadTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes. */ public class ReadTransactionActor extends UntypedActor { - private final DOMDataReadOnlyTransaction tx; + private final ReadAdapter readAdapter; + + private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) { + readAdapter = new ReadAdapter(tx); + } /** * Creates new actor Props. @@ -43,68 +34,13 @@ public class ReadTransactionActor extends UntypedActor { return Props.create(ReadTransactionActor.class, () -> new ReadTransactionActor(tx)); } - private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) { - this.tx = tx; - } - @Override public void onReceive(final Object message) throws Throwable { - if (message instanceof ReadRequest) { - - final ReadRequest readRequest = (ReadRequest) message; - final YangInstanceIdentifier path = readRequest.getPath(); - final LogicalDatastoreType store = readRequest.getStore(); - read(path, store, sender(), self()); - - } else if (message instanceof ExistsRequest) { - final ExistsRequest readRequest = (ExistsRequest) message; - final YangInstanceIdentifier path = readRequest.getPath(); - final LogicalDatastoreType store = readRequest.getStore(); - exists(path, store, sender(), self()); - + if (message instanceof ReadActorMessage) { + readAdapter.handle(message, sender(), self()); } else { unhandled(message); } } - private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender, - final ActorRef self) { - final CheckedFuture>, ReadFailedException> read = tx.read(store, path); - Futures.addCallback(read, new FutureCallback>>() { - - @Override - public void onSuccess(final Optional> result) { - if (!result.isPresent()) { - sender.tell(new EmptyReadResponse(), self); - return; - } - sender.tell(new NormalizedNodeMessage(path, result.get()), self); - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - sender.tell(throwable, self); - } - }); - } - - private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender, - final ActorRef self) { - final CheckedFuture readFuture = tx.exists(store, path); - Futures.addCallback(readFuture, new FutureCallback() { - @Override - public void onSuccess(final Boolean result) { - if (result == null) { - sender.tell(false, self); - } else { - sender.tell(result, self); - } - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - sender.tell(throwable, self); - } - }); - } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java new file mode 100644 index 0000000000..1c263c0191 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.actors; + +import akka.actor.Props; +import akka.actor.ReceiveTimeout; +import akka.actor.UntypedActor; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadActorMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +public class ReadWriteTransactionActor extends UntypedActor { + + private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTransactionActor.class); + + private final DOMDataReadWriteTransaction tx; + private final long idleTimeout; + private final ReadAdapter readAdapter; + private final WriteAdapter writeAdapter; + + private ReadWriteTransactionActor(final DOMDataReadWriteTransaction tx, final Duration idleTimeout) { + this.tx = tx; + this.idleTimeout = idleTimeout.toSeconds(); + if (this.idleTimeout > 0) { + context().setReceiveTimeout(idleTimeout); + } + readAdapter = new ReadAdapter(tx); + writeAdapter = new WriteAdapter(tx); + } + + /** + * Creates new actor Props. + * + * @param tx delegate device read write transaction + * @param idleTimeout idle time in seconds, after which transaction is closed automatically + * @return props + */ + static Props props(final DOMDataReadWriteTransaction tx, final Duration idleTimeout) { + return Props.create(ReadWriteTransactionActor.class, () -> new ReadWriteTransactionActor(tx, idleTimeout)); + } + + @Override + public void onReceive(final Object message) throws Throwable { + if (message instanceof ReadActorMessage) { + readAdapter.handle(message, sender(), self()); + } else if (message instanceof WriteActorMessage) { + writeAdapter.handle(message, sender(), context(), self()); + } else if (message instanceof ReceiveTimeout) { + LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor", + idleTimeout); + tx.cancel(); + context().stop(self()); + } else { + unhandled(message); + } + } + +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java new file mode 100644 index 0000000000..7e762bf264 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.actors; + +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; + +class WriteAdapter { + private final DOMDataWriteTransaction tx; + + public WriteAdapter(final DOMDataWriteTransaction tx) { + this.tx = tx; + } + + private void cancel(final ActorContext context, final ActorRef sender, final ActorRef self) { + final boolean cancelled = tx.cancel(); + sender.tell(cancelled, self); + context.stop(self); + } + + private void submit(final ActorRef requester, final ActorRef self, final ActorContext context) { + final CheckedFuture submitFuture = tx.submit(); + context.stop(self); + Futures.addCallback(submitFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + requester.tell(new SubmitReply(), self); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + requester.tell(throwable, self); + } + }); + } + + public void handle(final Object message, final ActorRef sender, final ActorContext context, final ActorRef self) { + if (message instanceof MergeRequest) { + final MergeRequest mergeRequest = (MergeRequest) message; + final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage(); + tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode()); + } else if (message instanceof PutRequest) { + final PutRequest putRequest = (PutRequest) message; + final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage(); + tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode()); + } else if (message instanceof DeleteRequest) { + final DeleteRequest deleteRequest = (DeleteRequest) message; + tx.delete(deleteRequest.getStore(), deleteRequest.getPath()); + } else if (message instanceof CancelRequest) { + cancel(context, sender, self); + } else if (message instanceof SubmitRequest) { + submit(sender, self, context); + } + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java index 008559ec9b..8a1f53af52 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java @@ -8,24 +8,12 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; -import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.actor.UntypedActor; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import javax.annotation.Nonnull; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -39,11 +27,21 @@ public class WriteTransactionActor extends UntypedActor { private final DOMDataWriteTransaction tx; private final long idleTimeout; + private final WriteAdapter writeAdapter; + + private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) { + this.tx = tx; + this.idleTimeout = idleTimeout.toSeconds(); + if (this.idleTimeout > 0) { + context().setReceiveTimeout(idleTimeout); + } + writeAdapter = new WriteAdapter(tx); + } /** * Creates new actor Props. * - * @param tx delegate device write transaction + * @param tx delegate device write transaction * @param idleTimeout idle time in seconds, after which transaction is closed automatically * @return props */ @@ -51,31 +49,10 @@ public class WriteTransactionActor extends UntypedActor { return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout)); } - private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) { - this.tx = tx; - this.idleTimeout = idleTimeout.toSeconds(); - if (this.idleTimeout > 0) { - context().setReceiveTimeout(idleTimeout); - } - } - @Override public void onReceive(final Object message) throws Throwable { - if (message instanceof MergeRequest) { - final MergeRequest mergeRequest = (MergeRequest) message; - final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage(); - tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode()); - } else if (message instanceof PutRequest) { - final PutRequest putRequest = (PutRequest) message; - final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage(); - tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode()); - } else if (message instanceof DeleteRequest) { - final DeleteRequest deleteRequest = (DeleteRequest) message; - tx.delete(deleteRequest.getStore(), deleteRequest.getPath()); - } else if (message instanceof CancelRequest) { - cancel(); - } else if (message instanceof SubmitRequest) { - submit(sender(), self()); + if (message instanceof WriteActorMessage) { + writeAdapter.handle(message, sender(), context(), self()); } else if (message instanceof ReceiveTimeout) { LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor", idleTimeout); @@ -86,25 +63,5 @@ public class WriteTransactionActor extends UntypedActor { } } - private void cancel() { - final boolean cancelled = tx.cancel(); - sender().tell(cancelled, self()); - context().stop(self()); - } - - private void submit(final ActorRef requester, final ActorRef self) { - final CheckedFuture submitFuture = tx.submit(); - context().stop(self); - Futures.addCallback(submitFuture, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - requester.tell(new SubmitReply(), self); - } - @Override - public void onFailure(@Nonnull final Throwable throwable) { - requester.tell(throwable, self); - } - }); - } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java new file mode 100644 index 0000000000..b500e0d5c3 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +class ProxyReadAdapter { + private static final Logger LOG = LoggerFactory.getLogger(ProxyReadAdapter.class); + + private final ActorRef masterTxActor; + private final RemoteDeviceId id; + private final ActorSystem actorSystem; + private final Timeout askTimeout; + + public ProxyReadAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, + final Timeout askTimeout) { + this.masterTxActor = masterTxActor; + this.id = id; + this.actorSystem = actorSystem; + this.askTimeout = askTimeout; + } + + public void close() { + //noop + } + + public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + LOG.trace("{}: Read {} via NETCONF: {}", id, store, path); + + final Future future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout); + final SettableFuture>> settableFuture = SettableFuture.create(); + future.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, + final Object success) throws Throwable { + if (failure != null) { // ask timeout + final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); + settableFuture.setException(exception); + return; + } + if (success instanceof Throwable) { // Error sended by master + settableFuture.setException((Throwable) success); + return; + } + if (success instanceof EmptyReadResponse) { + settableFuture.set(Optional.absent()); + return; + } + if (success instanceof NormalizedNodeMessage) { + final NormalizedNodeMessage data = (NormalizedNodeMessage) success; + settableFuture.set(Optional.of(data.getNode())); + } + } + }, actorSystem.dispatcher()); + return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER); + } + + public CheckedFuture exists(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + final Future existsScalaFuture = + Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout); + + LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path); + + final SettableFuture settableFuture = SettableFuture.create(); + existsScalaFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure != null) { // ask timeout + final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); + settableFuture.setException(exception); + return; + } + if (success instanceof Throwable) { + settableFuture.setException((Throwable) success); + return; + } + settableFuture.set((Boolean) success); + } + }, actorSystem.dispatcher()); + return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER); + } + +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java index 0756f33ddf..9ef11bc63d 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java @@ -10,27 +10,15 @@ package org.opendaylight.netconf.topology.singleton.impl.tx; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; /** * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master @@ -38,12 +26,7 @@ import scala.concurrent.Future; */ public class ProxyReadTransaction implements DOMDataReadOnlyTransaction { - private static final Logger LOG = LoggerFactory.getLogger(ProxyReadTransaction.class); - - private final ActorRef masterTxActor; - private final RemoteDeviceId id; - private final ActorSystem actorSystem; - private final Timeout askTimeout; + private final ProxyReadAdapter delegate; /** * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref @@ -53,75 +36,24 @@ public class ProxyReadTransaction implements DOMDataReadOnlyTransaction { */ public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, final Timeout askTimeout) { - this.masterTxActor = masterTxActor; - this.id = id; - this.actorSystem = actorSystem; - this.askTimeout = askTimeout; + delegate = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout); } @Override public void close() { - //noop + delegate.close(); } @Override public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { - LOG.trace("{}: Read {} via NETCONF: {}", id, store, path); - - final Future future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout); - final SettableFuture>> settableFuture = SettableFuture.create(); - future.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, - final Object success) throws Throwable { - if (failure != null) { // ask timeout - final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); - settableFuture.setException(exception); - return; - } - if (success instanceof Throwable) { // Error sended by master - settableFuture.setException((Throwable) success); - return; - } - if (success instanceof EmptyReadResponse) { - settableFuture.set(Optional.absent()); - return; - } - if (success instanceof NormalizedNodeMessage) { - final NormalizedNodeMessage data = (NormalizedNodeMessage) success; - settableFuture.set(Optional.of(data.getNode())); - } - } - }, actorSystem.dispatcher()); - return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER); + return delegate.read(store, path); } @Override public CheckedFuture exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) { - final Future existsScalaFuture = - Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout); - - LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path); - - final SettableFuture settableFuture = SettableFuture.create(); - existsScalaFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object success) throws Throwable { - if (failure != null) { // ask timeout - final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); - settableFuture.setException(exception); - return; - } - if (success instanceof Throwable) { - settableFuture.setException((Throwable) success); - return; - } - settableFuture.set((Boolean) success); - } - }, actorSystem.dispatcher()); - return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER); + return delegate.exists(store, path); } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java new file mode 100644 index 0000000000..2791389c0e --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master + * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}. + */ +public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction { + + private final ProxyReadAdapter delegateRead; + private final ProxyWriteAdapter delegateWrite; + + /** + * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor} ref + * @param id device id + * @param actorSystem system + * @param askTimeout + */ + public ProxyReadWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, + final ActorSystem actorSystem, final Timeout askTimeout) { + delegateRead = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout); + delegateWrite = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout); + } + + @Override + public boolean cancel() { + return delegateWrite.cancel(); + } + + @Override + public ListenableFuture> commit() { + return delegateWrite.commit(getIdentifier()); + } + + @Override + public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + return delegateRead.read(store, path); + } + + @Override + public CheckedFuture exists(final LogicalDatastoreType store, + final YangInstanceIdentifier path) { + return delegateRead.exists(store, path); + } + + @Override + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + delegateWrite.delete(store, path); + } + + @Override + public CheckedFuture submit() { + return delegateWrite.submit(getIdentifier()); + } + + @Override + public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, + final NormalizedNode data) { + delegateWrite.put(store, path, data, getIdentifier()); + } + + @Override + public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, + final NormalizedNode data) { + delegateWrite.merge(store, path, data, getIdentifier()); + } + + @Override + public Object getIdentifier() { + return this; + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java new file mode 100644 index 0000000000..e9be9e7747 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +public class ProxyWriteAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteAdapter.class); + + private final ActorRef masterTxActor; + private final RemoteDeviceId id; + private final ActorSystem actorSystem; + private final AtomicBoolean opened = new AtomicBoolean(true); + private final Timeout askTimeout; + + public ProxyWriteAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, + final Timeout askTimeout) { + this.masterTxActor = masterTxActor; + this.id = id; + this.actorSystem = actorSystem; + this.askTimeout = askTimeout; + } + + public boolean cancel() { + if (!opened.compareAndSet(true, false)) { + return false; + } + final Future cancelScalaFuture = + Patterns.ask(masterTxActor, new CancelRequest(), askTimeout); + + LOG.trace("{}: Cancel {} via NETCONF", id); + + try { + // here must be Await because AsyncWriteTransaction do not return future + return (boolean) Await.result(cancelScalaFuture, askTimeout.duration()); + } catch (final Exception e) { + return false; + } + } + + public CheckedFuture submit(final Object identifier) { + if (!opened.compareAndSet(true, false)) { + throw new IllegalStateException(id + ": Transaction" + identifier + " is closed"); + } + final Future submitScalaFuture = + Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout); + + LOG.trace("{}: Submit {} via NETCONF", id); + + final SettableFuture settableFuture = SettableFuture.create(); + submitScalaFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure != null) { // ask timeout + final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); + settableFuture.setException(exception); + return; + } + if (success instanceof Throwable) { + settableFuture.setException((Throwable) success); + } else { + if (success instanceof SubmitFailedReply) { + LOG.error("{}: Transaction was not submitted because already closed.", id); + } + settableFuture.set(null); + } + } + }, actorSystem.dispatcher()); + + return Futures.makeChecked(settableFuture, new Function() { + @Nullable + @Override + public TransactionCommitFailedException apply(@Nullable final Exception input) { + final String message = "Submit of transaction " + identifier + " failed"; + return new TransactionCommitFailedException(message, input); + } + }); + } + + public ListenableFuture> commit(final Object identifier) { + LOG.trace("{}: Commit", id); + + final CheckedFuture submit = submit(identifier); + return Futures.transform(submit, new Function>() { + @Nullable + @Override + public RpcResult apply(@Nullable final Void input) { + return RpcResultBuilder.success(TransactionStatus.SUBMITED).build(); + } + }); + } + + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) { + Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier); + LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier); + masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender()); + } + + public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, + final NormalizedNode data, final Object identifier) { + Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier); + final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data); + LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, path, data); + masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender()); + } + + public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, + final NormalizedNode data, final Object identifier) { + Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier); + final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data); + LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data); + masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender()); + } + +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java index 072f6227b4..eb64eaf886 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java @@ -10,38 +10,17 @@ package org.opendaylight.netconf.topology.singleton.impl.tx; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; -import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; /** * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master @@ -49,13 +28,7 @@ import scala.concurrent.Future; */ public class ProxyWriteTransaction implements DOMDataWriteTransaction { - private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class); - - private final ActorRef masterTxActor; - private final RemoteDeviceId id; - private final ActorSystem actorSystem; - private final AtomicBoolean opened = new AtomicBoolean(true); - private final Timeout askTimeout; + private final ProxyWriteAdapter proxyWriteAdapter; /** * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref @@ -65,107 +38,39 @@ public class ProxyWriteTransaction implements DOMDataWriteTransaction { */ public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem, final Timeout askTimeout) { - this.masterTxActor = masterTxActor; - this.id = id; - this.actorSystem = actorSystem; - this.askTimeout = askTimeout; + proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout); } @Override public boolean cancel() { - if (!opened.compareAndSet(true, false)) { - return false; - } - final Future cancelScalaFuture = - Patterns.ask(masterTxActor, new CancelRequest(), askTimeout); - - LOG.trace("{}: Cancel {} via NETCONF", id); - - try { - // here must be Await because AsyncWriteTransaction do not return future - return (boolean) Await.result(cancelScalaFuture, askTimeout.duration()); - } catch (final Exception e) { - return false; - } + return proxyWriteAdapter.cancel(); } @Override public CheckedFuture submit() { - if (!opened.compareAndSet(true, false)) { - throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed"); - } - final Future submitScalaFuture = - Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout); - - LOG.trace("{}: Submit {} via NETCONF", id); - - final SettableFuture settableFuture = SettableFuture.create(); - submitScalaFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object success) throws Throwable { - if (failure != null) { // ask timeout - final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); - settableFuture.setException(exception); - return; - } - if (success instanceof Throwable) { - settableFuture.setException((Throwable) success); - } else { - if (success instanceof SubmitFailedReply) { - LOG.error("{}: Transaction was not submitted because already closed.", id); - } - settableFuture.set(null); - } - } - }, actorSystem.dispatcher()); - - return Futures.makeChecked(settableFuture, new Function() { - @Nullable - @Override - public TransactionCommitFailedException apply(@Nullable final Exception input) { - final String message = "Submit of transaction " + getIdentifier() + " failed"; - return new TransactionCommitFailedException(message, input); - } - }); + return proxyWriteAdapter.submit(getIdentifier()); } @Override public ListenableFuture> commit() { - LOG.trace("{}: Commit", id); - - final CheckedFuture submit = submit(); - return Futures.transform(submit, new Function>() { - @Nullable - @Override - public RpcResult apply(@Nullable final Void input) { - return RpcResultBuilder.success(TransactionStatus.SUBMITED).build(); - } - }); + return proxyWriteAdapter.commit(getIdentifier()); } @Override public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) { - Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier()); - LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier); - masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender()); + proxyWriteAdapter.delete(store, identifier); } @Override public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier, final NormalizedNode data) { - Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier()); - final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data); - LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data); - masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender()); + proxyWriteAdapter.put(store, identifier, data, getIdentifier()); } @Override public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier, final NormalizedNode data) { - Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier()); - final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data); - LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data); - masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender()); + proxyWriteAdapter.merge(store, identifier, data, getIdentifier()); } @Override diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java index 7a73fe2e90..72d8e9e849 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java @@ -8,6 +8,6 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; -public class CancelRequest implements TransactionRequest { +public class CancelRequest implements WriteActorMessage { private static final long serialVersionUID = 1L; } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java index 1548dc791a..5e46157e40 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java @@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class DeleteRequest implements TransactionRequest { +public class DeleteRequest implements WriteActorMessage { private static final long serialVersionUID = 1L; private final LogicalDatastoreType store; diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java index b5fae5f029..9bce961fcd 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java @@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class ExistsRequest implements TransactionRequest { +public class ExistsRequest implements ReadActorMessage { private static final long serialVersionUID = 1L; private final LogicalDatastoreType store; diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java index 8c03023654..536d52172d 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java @@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -public class MergeRequest implements TransactionRequest { +public class MergeRequest implements WriteActorMessage { private static final long serialVersionUID = 1L; private final NormalizedNodeMessage data; diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java new file mode 100644 index 0000000000..141ecccdd3 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.transactions; + +import akka.actor.ActorRef; +import java.io.Serializable; + +public class NewReadWriteTransactionReply implements Serializable { + + private final ActorRef txActor; + + public NewReadWriteTransactionReply(final ActorRef txActor) { + this.txActor = txActor; + } + + public ActorRef getTxActor() { + return txActor; + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java new file mode 100644 index 0000000000..9a9a5852b5 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.transactions; + +import java.io.Serializable; + +public class NewReadWriteTransactionRequest implements Serializable { + private static final long serialVersionUID = 1L; +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java index 41de9c22b7..c9084b813e 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java @@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -public class PutRequest implements TransactionRequest { +public class PutRequest implements WriteActorMessage { private static final long serialVersionUID = 1L; private final LogicalDatastoreType store; diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java new file mode 100644 index 0000000000..88bae04fc2 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.transactions; + +public interface ReadActorMessage extends TransactionRequest { +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java index d950f28557..b64f46cd91 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java @@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class ReadRequest implements TransactionRequest { +public class ReadRequest implements ReadActorMessage { private static final long serialVersionUID = 1L; private final LogicalDatastoreType store; diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java index 6b6af7e773..64da32a0fe 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java @@ -8,6 +8,6 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions; -public class SubmitRequest implements TransactionRequest { +public class SubmitRequest implements WriteActorMessage { private static final long serialVersionUID = 1L; } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java new file mode 100644 index 0000000000..07af5568a2 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.transactions; + +public interface WriteActorMessage extends TransactionRequest { +} diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java new file mode 100644 index 0000000000..08b755ef98 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.actors; + +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.testkit.TestProbe; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +public class ReadWriteTransactionActorTest { + + private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; + private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; + private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS); + + @Mock + private DOMDataReadWriteTransaction deviceReadWriteTx; + private TestProbe probe; + private ActorSystem system; + private TestActorRef actorRef; + private NormalizedNode node; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + system = ActorSystem.apply(); + probe = TestProbe.apply(system); + node = Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont"))) + .build(); + actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(deviceReadWriteTx, + Duration.apply(2, TimeUnit.SECONDS)), "testA"); + } + + @After + public void tearDown() throws Exception { + JavaTestKit.shutdownActorSystem(system, null, true); + } + + @Test + public void testRead() throws Exception { + final ContainerNode node = Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont"))) + .build(); + when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node))); + actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); + verify(deviceReadWriteTx).read(STORE, PATH); + probe.expectMsgClass(NormalizedNodeMessage.class); + } + + @Test + public void testReadEmpty() throws Exception { + when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent())); + actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); + verify(deviceReadWriteTx).read(STORE, PATH); + probe.expectMsgClass(EmptyReadResponse.class); + } + + @Test + public void testReadFailure() throws Exception { + final ReadFailedException cause = new ReadFailedException("fail"); + when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); + actorRef.tell(new ReadRequest(STORE, PATH), probe.ref()); + verify(deviceReadWriteTx).read(STORE, PATH); + probe.expectMsg(cause); + } + + @Test + public void testExists() throws Exception { + when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true)); + actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); + verify(deviceReadWriteTx).exists(STORE, PATH); + probe.expectMsg(true); + } + + @Test + public void testExistsFailure() throws Exception { + final ReadFailedException cause = new ReadFailedException("fail"); + when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause)); + actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref()); + verify(deviceReadWriteTx).exists(STORE, PATH); + probe.expectMsg(cause); + } + + @Test + public void testPut() throws Exception { + final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node); + actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref()); + verify(deviceReadWriteTx).put(STORE, PATH, node); + } + + @Test + public void testMerge() throws Exception { + final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node); + actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref()); + verify(deviceReadWriteTx).merge(STORE, PATH, node); + } + + @Test + public void testDelete() throws Exception { + actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref()); + verify(deviceReadWriteTx).delete(STORE, PATH); + } + + @Test + public void testCancel() throws Exception { + when(deviceReadWriteTx.cancel()).thenReturn(true); + final Future cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT); + final Object result = Await.result(cancelFuture, TIMEOUT.duration()); + Preconditions.checkState(result instanceof Boolean); + verify(deviceReadWriteTx).cancel(); + Assert.assertTrue((Boolean) result); + } + + @Test + public void testSubmit() throws Exception { + when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null)); + final Future submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT); + final Object result = Await.result(submitFuture, TIMEOUT.duration()); + Assert.assertTrue(result instanceof SubmitReply); + verify(deviceReadWriteTx).submit(); + } + + @Test + public void testSubmitFail() throws Exception { + final RpcError rpcError = + RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail"); + final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError); + when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause)); + final Future submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT); + final Object result = Await.result(submitFuture, TIMEOUT.duration()); + Assert.assertEquals(cause, result); + verify(deviceReadWriteTx).submit(); + } + + @Test + public void testIdleTimeout() throws Exception { + final TestProbe probe = new TestProbe(system); + probe.watch(actorRef); + verify(deviceReadWriteTx, timeout(3000)).cancel(); + probe.expectTerminated(actorRef, TIMEOUT.duration()); + } +} \ No newline at end of file diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java new file mode 100644 index 0000000000..a24f2b124a --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.config.util.xml.DocumentedException; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; + +public class ProxyReadWriteTransactionTest { + private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY; + private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION; + + private ActorSystem system; + private TestProbe masterActor; + private ContainerNode node; + private ProxyReadWriteTransaction tx; + + @Before + public void setUp() throws Exception { + system = ActorSystem.apply(); + masterActor = new TestProbe(system); + final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830)); + node = Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont"))) + .build(); + tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS)); + } + + @After + public void tearDown() throws Exception { + JavaTestKit.shutdownActorSystem(system, null, true); + } + + @Test + public void testCancel() throws Exception { + final Future submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel()); + masterActor.expectMsgClass(CancelRequest.class); + masterActor.reply(true); + Assert.assertTrue(submit.get()); + } + + @Test + public void testCancelSubmitted() throws Exception { + final CheckedFuture submitFuture = tx.submit(); + masterActor.expectMsgClass(SubmitRequest.class); + masterActor.reply(new SubmitReply()); + submitFuture.checkedGet(); + final Future submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel()); + masterActor.expectNoMsg(); + Assert.assertFalse(submit.get()); + } + + @Test + public void testSubmit() throws Exception { + final CheckedFuture submitFuture = tx.submit(); + masterActor.expectMsgClass(SubmitRequest.class); + masterActor.reply(new SubmitReply()); + submitFuture.checkedGet(); + } + + @Test + public void testDoubleSubmit() throws Exception { + final CheckedFuture submitFuture = tx.submit(); + masterActor.expectMsgClass(SubmitRequest.class); + masterActor.reply(new SubmitReply()); + submitFuture.checkedGet(); + try { + tx.submit().checkedGet(); + Assert.fail("Should throw IllegalStateException"); + } catch (final IllegalStateException e) { + masterActor.expectNoMsg(); + } + } + + @Test + public void testCommit() throws Exception { + final ListenableFuture> submitFuture = tx.commit(); + masterActor.expectMsgClass(SubmitRequest.class); + masterActor.reply(new SubmitReply()); + Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult()); + } + + @Test + public void testDelete() throws Exception { + tx.delete(STORE, PATH); + masterActor.expectMsgClass(DeleteRequest.class); + } + + @Test + public void testDeleteClosed() throws Exception { + submit(); + try { + tx.delete(STORE, PATH); + Assert.fail("Should throw IllegalStateException"); + } catch (final IllegalStateException e) { + masterActor.expectNoMsg(); + } + } + + @Test + public void testPut() throws Exception { + tx.put(STORE, PATH, node); + masterActor.expectMsgClass(PutRequest.class); + } + + @Test + public void testPutClosed() throws Exception { + submit(); + try { + tx.put(STORE, PATH, node); + Assert.fail("Should throw IllegalStateException"); + } catch (final IllegalStateException e) { + masterActor.expectNoMsg(); + } + } + + @Test + public void testMerge() throws Exception { + tx.merge(STORE, PATH, node); + masterActor.expectMsgClass(MergeRequest.class); + } + + @Test + public void testMergeClosed() throws Exception { + submit(); + try { + tx.merge(STORE, PATH, node); + Assert.fail("Should throw IllegalStateException"); + } catch (final IllegalStateException e) { + masterActor.expectNoMsg(); + } + } + + @Test + public void testGetIdentifier() throws Exception { + Assert.assertEquals(tx, tx.getIdentifier()); + } + + private void submit() throws TransactionCommitFailedException { + final CheckedFuture submit = tx.submit(); + masterActor.expectMsgClass(SubmitRequest.class); + masterActor.reply(new SubmitReply()); + submit.checkedGet(); + } + + @Test + public void testRead() throws Exception { + final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + masterActor.expectMsgClass(ReadRequest.class); + masterActor.reply(new NormalizedNodeMessage(PATH, node)); + final Optional> result = read.checkedGet(); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(node, result.get()); + } + + @Test + public void testReadEmpty() throws Exception { + final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + masterActor.expectMsgClass(ReadRequest.class); + masterActor.reply(new EmptyReadResponse()); + final Optional> result = read.checkedGet(); + Assert.assertFalse(result.isPresent()); + } + + @Test(expected = ReadFailedException.class) + public void testReadFail() throws Exception { + final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + masterActor.expectMsgClass(ReadRequest.class); + masterActor.reply(new RuntimeException("fail")); + read.checkedGet(); + } + + @Test + public void testExists() throws Exception { + final CheckedFuture read = tx.exists(STORE, PATH); + masterActor.expectMsgClass(ExistsRequest.class); + masterActor.reply(true); + final Boolean result = read.checkedGet(); + Assert.assertTrue(result); + } + + @Test(expected = ReadFailedException.class) + public void testExistsFail() throws Exception { + final CheckedFuture read = tx.exists(STORE, PATH); + masterActor.expectMsgClass(ExistsRequest.class); + masterActor.reply(new RuntimeException("fail")); + read.checkedGet(); + } + + @Test + public void testMasterDownRead() throws Exception { + final CheckedFuture>, ReadFailedException> read = tx.read(STORE, PATH); + masterActor.expectMsgClass(ReadRequest.class); + //master doesn't reply + try { + read.checkedGet(); + Assert.fail("Exception should be thrown"); + } catch (final ReadFailedException e) { + final Throwable cause = e.getCause(); + Assert.assertTrue(cause instanceof DocumentedException); + final DocumentedException de = (DocumentedException) cause; + Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity()); + Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag()); + Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType()); + } + } +} \ No newline at end of file diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java new file mode 100644 index 0000000000..c50cef45a5 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java @@ -0,0 +1,361 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.tx; + +import static junit.framework.TestCase.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker; +import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; +import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; +import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +public class ReadWriteTransactionTest { + private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); + private static final int TIMEOUT_SEC = 5; + private static ActorSystem system; + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Mock + private DOMDataBroker deviceDataBroker; + @Mock + private DOMDataReadWriteTransaction readWriteTx; + @Mock + private DOMRpcService domRpcService; + @Mock + private DOMMountPointService mountPointService; + private ActorRef masterRef; + private ProxyDOMDataBroker slaveDataBroker; + private List sourceIdentifiers; + private NormalizedNode testNode; + private YangInstanceIdentifier instanceIdentifier; + private LogicalDatastoreType storeType; + + @Before + public void setup() throws Exception { + initMocks(this); + + system = ActorSystem.create(); + + final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology", + new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999)); + + final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); + doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout(); + final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, + DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService); + + masterRef = TestActorRef.create(system, props, "master_read"); + + sourceIdentifiers = Lists.newArrayList(); + + doReturn(readWriteTx).when(deviceDataBroker).newReadWriteTransaction(); + doNothing().when(readWriteTx).put(storeType, instanceIdentifier, testNode); + doNothing().when(readWriteTx).merge(storeType, instanceIdentifier, testNode); + doNothing().when(readWriteTx).delete(storeType, instanceIdentifier); + + // Create slave data broker for testing proxy + slaveDataBroker = + new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS)); + initializeDataTest(); + testNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname"))) + .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build(); + instanceIdentifier = YangInstanceIdentifier.EMPTY; + storeType = LogicalDatastoreType.CONFIGURATION; + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system, null, true); + system = null; + } + + @Test + public void testPut() throws Exception { + // Test of invoking put on master through slave proxy + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + wTx.put(storeType, instanceIdentifier, testNode); + + verify(readWriteTx, timeout(2000)).put(storeType, instanceIdentifier, testNode); + + wTx.cancel(); + } + + @Test + public void testMerge() throws Exception { + // Test of invoking merge on master through slave proxy + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + wTx.merge(storeType, instanceIdentifier, testNode); + + verify(readWriteTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode); + + wTx.cancel(); + } + + @Test + public void testDelete() throws Exception { + final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY; + final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION; + + // Test of invoking delete on master through slave proxy + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + wTx.delete(storeType, instanceIdentifier); + wTx.cancel(); + + verify(readWriteTx, timeout(2000)).delete(storeType, instanceIdentifier); + } + + @Test + public void testSubmit() throws Exception { + final CheckedFuture resultSubmit = Futures.immediateCheckedFuture(null); + doReturn(resultSubmit).when(readWriteTx).submit(); + + // Without Tx + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + + final CheckedFuture resultSubmitResponse = wTx.submit(); + + final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertNull(result); + } + + @Test + public void testSubmitWithOperation() throws Exception { + final CheckedFuture resultSubmitTx = Futures.immediateCheckedFuture(null); + doReturn(resultSubmitTx).when(readWriteTx).submit(); + // With Tx + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + wTx.delete(LogicalDatastoreType.CONFIGURATION, + YangInstanceIdentifier.EMPTY); + + final CheckedFuture resultSubmitTxResponse = wTx.submit(); + + final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertNull(resultTx); + } + + @Test + public void testSubmitFail() throws Exception { + final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null); + final CheckedFuture resultThrowable = + Futures.immediateFailedCheckedFuture(throwable); + doReturn(resultThrowable).when(readWriteTx).submit(); + + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + wTx.delete(LogicalDatastoreType.CONFIGURATION, + YangInstanceIdentifier.EMPTY); + final CheckedFuture resultThrowableResponse = + wTx.submit(); + exception.expect(TransactionCommitFailedException.class); + resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + } + + @Test + public void testCancel() throws Exception { + doReturn(true).when(readWriteTx).cancel(); + + // Without Tx + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + final Boolean resultFalseNoTx = wTx.cancel(); + assertEquals(true, resultFalseNoTx); + } + + @Test + public void testCancelWithOperation() throws Exception { + doReturn(true).when(readWriteTx).cancel(); + + // With Tx, readWriteTx test + final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction(); + wTx.delete(LogicalDatastoreType.CONFIGURATION, + YangInstanceIdentifier.EMPTY); + + final Boolean resultTrue = wTx.cancel(); + assertEquals(true, resultTrue); + + final Boolean resultFalse = wTx.cancel(); + assertEquals(false, resultFalse); + } + + @Test + public void testRead() throws Exception { + // Message: NormalizedNodeMessage + final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname"))) + .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build(); + final CheckedFuture>, ReadFailedException> resultNormalizedNodeMessage = + Futures.immediateCheckedFuture(Optional.of(outputNode)); + doReturn(resultNormalizedNodeMessage).when(readWriteTx).read(storeType, instanceIdentifier); + + final CheckedFuture>, ReadFailedException> resultNodeMessageResponse = + slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier); + + final Optional> resultNodeMessage = + resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertTrue(resultNodeMessage.isPresent()); + assertEquals(resultNodeMessage.get(), outputNode); + } + + @Test + public void testReadEmpty() throws Exception { + // Message: EmptyReadResponse + final CheckedFuture>, ReadFailedException> resultEmpty = + Futures.immediateCheckedFuture(Optional.absent()); + doReturn(resultEmpty).when(readWriteTx).read(storeType, instanceIdentifier); + + final CheckedFuture>, ReadFailedException> resultEmptyResponse = + slaveDataBroker.newReadWriteTransaction().read(storeType, + instanceIdentifier); + + final Optional> resultEmptyMessage = + resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertEquals(resultEmptyMessage, Optional.absent()); + } + + @Test + public void testReadFail() throws Exception { + // Message: Throwable + final ReadFailedException readFailedException = new ReadFailedException("Fail", null); + final CheckedFuture>, ReadFailedException> resultThrowable = + Futures.immediateFailedCheckedFuture(readFailedException); + + doReturn(resultThrowable).when(readWriteTx).read(storeType, instanceIdentifier); + + final CheckedFuture>, ReadFailedException> resultThrowableResponse = + slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier); + + exception.expect(ReadFailedException.class); + resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + } + + @Test + public void testExist() throws Exception { + // Message: True + final CheckedFuture resultTrue = + Futures.immediateCheckedFuture(true); + doReturn(resultTrue).when(readWriteTx).exists(storeType, instanceIdentifier); + + final CheckedFuture trueResponse = + slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier); + + final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertEquals(true, trueMessage); + } + + @Test + public void testExistsNull() throws Exception { + // Message: False, result null + final CheckedFuture resultNull = Futures.immediateCheckedFuture(null); + doReturn(resultNull).when(readWriteTx).exists(storeType, instanceIdentifier); + + final CheckedFuture nullResponse = + slaveDataBroker.newReadWriteTransaction().exists(storeType, + instanceIdentifier); + + final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertEquals(false, nullFalseMessage); + } + + @Test + public void testExistsFalse() throws Exception { + // Message: False + final CheckedFuture resultFalse = Futures.immediateCheckedFuture(false); + doReturn(resultFalse).when(readWriteTx).exists(storeType, instanceIdentifier); + + final CheckedFuture falseResponse = + slaveDataBroker.newReadWriteTransaction().exists(storeType, + instanceIdentifier); + + final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertEquals(false, falseMessage); + } + + @Test + public void testExistsFail() throws Exception { + // Message: Throwable + final ReadFailedException readFailedException = new ReadFailedException("Fail", null); + final CheckedFuture resultThrowable = + Futures.immediateFailedCheckedFuture(readFailedException); + doReturn(resultThrowable).when(readWriteTx).exists(storeType, instanceIdentifier); + + final CheckedFuture resultThrowableResponse = + slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier); + + exception.expect(ReadFailedException.class); + resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS); + } + + private void initializeDataTest() throws Exception { + final Future initialDataToActor = + Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers, + domRpcService), TIMEOUT); + + final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); + + assertTrue(success instanceof MasterActorDataInitialized); + } + +} -- 2.36.6