Convert ProxyDOMDataBroker tx creation to async 18/72118/6
authorTom Pantelis <tompantelis@gmail.com>
Mon, 21 May 2018 14:28:52 +0000 (10:28 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 4 Jun 2018 22:26:56 +0000 (18:26 -0400)
ProxyReadWriteTransaction was modified to take the ActorRef
Future from the tx creation message and add a callback when
complete. Transaction operations prior to completion are
queued and replayed to a ProxyTransactionFacade instance
once the Future is complete. For successful completion, an
ActorProxyTransactionFacade is created that interfaces
with the master ActorRef. On failed Future, a
FailedProxyTransactionFacade is created that returns a
failed Future for read, exists and commit operations.

The ProxyReadAdapter and ProxyWriteAdapter were removed
in lieu of ProxyTransactionFacade which combines the
read/write operations for simplicity.

Some simple actor response messages,
eg NewWriteTransactionReply, were removed in lieu of
returning the payload or exception via
akka.actor.Status.Success or akka.actor.Status.Failure.
This simplifies caller response handling as akka
automatically unwraps Success and Failure in callbacks
and synchronous await.

UTs were added, modified and removed corresponding to
the changes.

Change-Id: I0bd2a931d91ded97ebba7ccc207d51bd6474a41c
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
32 files changed:
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ActorProxyTransactionFacade.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/FailedProxyTransactionFacade.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyTransactionFacade.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitFailedReply.java [deleted file]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java [deleted file]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBrokerTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTestAdapter.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTestAdapter.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java [deleted file]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java [deleted file]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java [deleted file]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java [deleted file]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java [deleted file]

index ac77b7b32adbd6687aa0392ffb9aaa2762e70b33..3a70ed700cb847ceecec8dde871dd13df8357868 100644 (file)
@@ -130,7 +130,7 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
         // We need to create ProxyDOMDataBroker so accessing mountpoint
         // on leader node would be same as on follower node
         final ProxyDOMDataBroker proxyDataBroker =
-                new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
+                new ProxyDOMDataBroker(id, masterActorRef, actorSystem.dispatcher(), actorResponseWaitTime);
         salProvider.getMountInstance()
                 .onTopologyDeviceConnected(currentSchemaContext, proxyDataBroker, deviceRpc, notificationService);
     }
index 1c7429a45cf3893eaf60f21f2b21e9af90c63b0c..b4cbdd06d4e51f0e2b0be2b6184e277ea7552648 100644 (file)
@@ -9,10 +9,8 @@
 package org.opendaylight.netconf.topology.singleton.impl;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import com.google.common.base.Verify;
 import java.util.Collections;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -26,14 +24,10 @@ import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
-import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
 public class ProxyDOMDataBroker implements DOMDataBroker {
@@ -41,21 +35,21 @@ public class ProxyDOMDataBroker implements DOMDataBroker {
     private final Timeout askTimeout;
     private final RemoteDeviceId id;
     private final ActorRef masterNode;
-    private final ActorSystem actorSystem;
+    private final ExecutionContext executionContext;
 
     /**
      * Constructor for {@code ProxyDOMDataBroker}.
      *
-     * @param actorSystem system
      * @param id          id
      * @param masterNode  {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
+     * @param executionContext ExecutionContext
      * @param askTimeout  ask timeout
      */
-    public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
-                              final ActorRef masterNode, final Timeout askTimeout) {
+    public ProxyDOMDataBroker(final RemoteDeviceId id, final ActorRef masterNode,
+            final ExecutionContext executionContext, final Timeout askTimeout) {
         this.id = id;
         this.masterNode = masterNode;
-        this.actorSystem = actorSystem;
+        this.executionContext = executionContext;
         this.askTimeout = askTimeout;
     }
 
@@ -63,60 +57,21 @@ public class ProxyDOMDataBroker implements DOMDataBroker {
     @Override
     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
         final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadTransactionRequest(), askTimeout);
-        final Object msg;
-        try {
-            msg = Await.result(txActorFuture, askTimeout.duration());
-        } catch (Exception e) {
-            throw new IllegalStateException("Can't create ProxyReadTransaction", e);
-        }
-
-        if (msg instanceof Exception) {
-            throw new IllegalStateException("Can't create ProxyReadTransaction", (Exception) msg);
-        }
-
-        Verify.verify(msg instanceof NewReadTransactionReply);
-        final NewReadTransactionReply reply = (NewReadTransactionReply) msg;
-        return new ProxyReadTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        return new ProxyReadTransaction(id, txActorFuture, executionContext, askTimeout);
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
     public DOMDataReadWriteTransaction newReadWriteTransaction() {
         final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadWriteTransactionRequest(), askTimeout);
-        final Object msg;
-        try {
-            msg = Await.result(txActorFuture, askTimeout.duration());
-        } catch (Exception e) {
-            throw new IllegalStateException("Can't create ProxyReadWriteTransaction", e);
-        }
-
-        if (msg instanceof Exception) {
-            throw new IllegalStateException("Can't create ProxyReadWriteTransaction", (Exception) msg);
-        }
-
-        Verify.verify(msg instanceof NewReadWriteTransactionReply);
-        final NewReadWriteTransactionReply reply = (NewReadWriteTransactionReply) msg;
-        return new ProxyReadWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        return new ProxyReadWriteTransaction(id, txActorFuture, executionContext, askTimeout);
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
     public DOMDataWriteTransaction newWriteOnlyTransaction() {
         final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewWriteTransactionRequest(), askTimeout);
-        final Object msg;
-        try {
-            msg = Await.result(txActorFuture, askTimeout.duration());
-        } catch (Exception e) {
-            throw new IllegalStateException("Can't create ProxyWriteTransaction", e);
-        }
-
-        if (msg instanceof Exception) {
-            throw new IllegalStateException("Can't create ProxyWriteTransaction", (Exception) msg);
-        }
-
-        Verify.verify(msg instanceof NewWriteTransactionReply);
-        final NewWriteTransactionReply reply = (NewWriteTransactionReply) msg;
-        return new ProxyWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        return new ProxyReadWriteTransaction(id, txActorFuture, executionContext, askTimeout);
     }
 
     @Override
index 795ba78c75c3235d11b80f1322fad208a406424c..d2d8c5f5a4a9ec6e4773079913af561d123e6ef6 100644 (file)
@@ -50,7 +50,7 @@ public class SlaveSalFacade {
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
         final ProxyDOMDataBroker netconfDeviceDataBroker =
-                new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
+                new ProxyDOMDataBroker(id, masterActorRef, actorSystem.dispatcher(), actorResponseWaitTime);
 
         salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker,
                 deviceRpc, notificationService);
index 67d0b52a7c9bb668181aa3da72a6f6d020c46255..e8415f62fed3be1de4991583c6bba77c1c1a7e6d 100644 (file)
@@ -56,11 +56,8 @@ import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSource
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -156,25 +153,23 @@ public class NetconfNodeActor extends AbstractUntypedActor {
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
         } else if (message instanceof NewReadTransactionRequest) { // master
-
-            sender().tell(new NewReadTransactionReply(readTxActor), self());
-
+            sender().tell(new Success(readTxActor), self());
         } else if (message instanceof NewWriteTransactionRequest) { // master
             try {
                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
-                sender().tell(new NewWriteTransactionReply(txActor), self());
+                sender().tell(new Success(txActor), self());
             } catch (final Exception t) {
-                sender().tell(t, self());
+                sender().tell(new Failure(t), self());
             }
 
         } else if (message instanceof NewReadWriteTransactionRequest) {
             try {
                 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
-                sender().tell(new NewReadWriteTransactionReply(txActor), self());
+                sender().tell(new Success(txActor), self());
             } catch (final Exception t) {
-                sender().tell(t, self());
+                sender().tell(new Failure(t), self());
             }
         } else if (message instanceof InvokeRpcMessage) { // master
             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
@@ -223,6 +218,7 @@ public class NetconfNodeActor extends AbstractUntypedActor {
             @Override
             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
                 try {
+                    LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
                 } catch (IOException e) {
                     sender.tell(new Failure(e), getSelf());
@@ -231,6 +227,7 @@ public class NetconfNodeActor extends AbstractUntypedActor {
 
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
+                LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
                 sender.tell(new Failure(throwable), getSelf());
             }
         }, MoreExecutors.directExecutor());
index 6b79f621257ad0abcc2a7d19996b41db4ce86325..e3fb37f2198f6118065ed94b7fa6486e786b604c 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
 import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -66,7 +67,7 @@ class ReadAdapter {
 
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
-                sender.tell(throwable, self);
+                sender.tell(new Failure(throwable), self);
             }
         }, MoreExecutors.directExecutor());
     }
@@ -86,7 +87,7 @@ class ReadAdapter {
 
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
-                sender.tell(throwable, self);
+                sender.tell(new Failure(throwable), self);
             }
         }, MoreExecutors.directExecutor());
     }
index a214dd4fc71a7a45383f7f78e63686a648ca9e0a..250b893099cde703440be130d0058c16b969a658 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.netconf.topology.singleton.impl.actors;
 
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -22,8 +24,6 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelR
 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,12 +50,12 @@ class WriteAdapter {
         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                requester.tell(new SubmitReply(), self);
+                requester.tell(new Success(null), self);
             }
 
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
-                requester.tell(new SubmitFailedReply(throwable), self);
+                requester.tell(new Failure(throwable), self);
             }
         }, MoreExecutors.directExecutor());
     }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ActorProxyTransactionFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ActorProxyTransactionFacade.java
new file mode 100644 (file)
index 0000000..3ab1cd9
--- /dev/null
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Objects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+/**
+ * ProxyTransactionFacade implementation that interfaces with an actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ActorProxyTransactionFacade implements ProxyTransactionFacade {
+    private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ExecutionContext executionContext;
+    private final Timeout askTimeout;
+
+    ActorProxyTransactionFacade(ActorRef masterTxActor, RemoteDeviceId id, ExecutionContext executionContext,
+            Timeout askTimeout) {
+        this.masterTxActor = Objects.requireNonNull(masterTxActor);
+        this.id = Objects.requireNonNull(id);
+        this.executionContext = Objects.requireNonNull(executionContext);
+        this.askTimeout = Objects.requireNonNull(askTimeout);
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return id;
+    }
+
+    @Override
+    public boolean cancel() {
+        LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    LOG.warn("{}: Cancel failed", id, failure);
+                    return;
+                }
+
+                LOG.debug("{}: Cancel succeeded", id);
+            }
+        }, executionContext);
+
+        return true;
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
+            YangInstanceIdentifier path) {
+        LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    LOG.debug("{}: Read {} {} failed", id, store, path, failure);
+                    settableFuture.setException(processFailure(failure));
+                    return;
+                }
+
+                LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
+
+                if (response instanceof EmptyReadResponse) {
+                    settableFuture.set(Optional.absent());
+                    return;
+                }
+
+                if (response instanceof NormalizedNodeMessage) {
+                    final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
+                    settableFuture.set(Optional.of(data.getNode()));
+                }
+            }
+        }, executionContext);
+
+        return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
+        LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
+                    settableFuture.setException(processFailure(failure));
+                    return;
+                }
+
+                LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
+
+                settableFuture.set((Boolean) response);
+            }
+        }, executionContext);
+
+        return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
+        LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
+        masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
+    }
+
+    @Override
+    public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
+        masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
+    }
+
+    @Override
+    public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
+        masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
+    }
+
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        LOG.debug("{}: Commit via actor {}", id, masterTxActor);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+        final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object response) {
+                if (failure != null) {
+                    LOG.debug("{}: Commit failed", id, failure);
+                    settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
+                    return;
+                }
+
+                LOG.debug("{}: Commit succeeded", id);
+
+                settableFuture.set(CommitInfo.empty());
+            }
+        }, executionContext);
+
+        return settableFuture;
+    }
+
+    private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
+        return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed", getIdentifier()),
+                failure);
+    }
+
+    private Throwable processFailure(Throwable failure) {
+        if (failure instanceof AskTimeoutException) {
+            return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure);
+        }
+
+        return failure;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/FailedProxyTransactionFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/FailedProxyTransactionFacade.java
new file mode 100644 (file)
index 0000000..97d51cb
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.Objects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of ProxyTransactionFacade that fails each request.
+ *
+ * @author Thomas Pantelis
+ */
+class FailedProxyTransactionFacade implements ProxyTransactionFacade {
+    private static final Logger LOG = LoggerFactory.getLogger(FailedProxyTransactionFacade.class);
+
+    private final RemoteDeviceId id;
+    private final Throwable failure;
+
+    FailedProxyTransactionFacade(RemoteDeviceId id, Throwable failure) {
+        this.id = Objects.requireNonNull(id);
+        this.failure = Objects.requireNonNull(failure);
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return id;
+    }
+
+    @Override
+    public boolean cancel() {
+        return true;
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
+            YangInstanceIdentifier path) {
+        LOG.debug("{}: Read {} {} - failure {}", id, store, path, failure);
+        return Futures.immediateFailedCheckedFuture(ReadFailedException.MAPPER.apply(
+                failure instanceof Exception ? (Exception)failure : new ReadFailedException("read", failure)));
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
+        LOG.debug("{}: Exists {} {} - failure {}", id, store, path, failure);
+        return Futures.immediateFailedCheckedFuture(ReadFailedException.MAPPER.apply(
+                failure instanceof Exception ? (Exception)failure : new ReadFailedException("read", failure)));
+    }
+
+    @Override
+    public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
+        LOG.debug("{}: Delete {} {} - failure {}", id, store, path, failure);
+    }
+
+    @Override
+    public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        LOG.debug("{}: Put {} {} - failure {}", id, store, path, failure);
+    }
+
+    @Override
+    public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        LOG.debug("{}: Merge {} {} - failure {}", id, store, path, failure);
+    }
+
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        LOG.debug("{}: Commit {} {} - failure {}", id, failure);
+        return FluentFuture.from(Futures.immediateFailedFuture(failure instanceof Exception
+                ? AsyncWriteTransaction.SUBMIT_EXCEPTION_MAPPER.apply((Exception)failure)
+                        : new TransactionCommitFailedException("commit", failure)));
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java
deleted file mode 100644 (file)
index 0f23c11..0000000
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-class ProxyReadAdapter {
-    private static final Logger LOG = LoggerFactory.getLogger(ProxyReadAdapter.class);
-
-    private final ActorRef masterTxActor;
-    private final RemoteDeviceId id;
-    private final ActorSystem actorSystem;
-    private final Timeout askTimeout;
-
-    ProxyReadAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
-                            final Timeout askTimeout) {
-        this.masterTxActor = masterTxActor;
-        this.id = id;
-        this.actorSystem = actorSystem;
-        this.askTimeout = askTimeout;
-    }
-
-    public void close() {
-        //noop
-    }
-
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
-                                                                                   final YangInstanceIdentifier path) {
-        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
-        final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
-        future.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure,
-                                   final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
-                    settableFuture.setException(exception);
-                    return;
-                }
-                if (success instanceof Throwable) { // Error sended by master
-                    settableFuture.setException((Throwable) success);
-                    return;
-                }
-                if (success instanceof EmptyReadResponse) {
-                    settableFuture.set(Optional.absent());
-                    return;
-                }
-                if (success instanceof NormalizedNodeMessage) {
-                    final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
-                    settableFuture.set(Optional.of(data.getNode()));
-                }
-            }
-        }, actorSystem.dispatcher());
-        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
-    }
-
-    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
-                                                              final YangInstanceIdentifier path) {
-        final Future<Object> existsScalaFuture =
-                Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
-
-        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
-        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
-        existsScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
-                    settableFuture.setException(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    settableFuture.setException((Throwable) success);
-                    return;
-                }
-                settableFuture.set((Boolean) success);
-            }
-        }, actorSystem.dispatcher());
-        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
-    }
-
-}
index b73c9466e153aac0a64fa610c369cb20c154621a..ed972c1d5d3689cdbeed2203102a9448424be0d9 100644 (file)
@@ -9,58 +9,25 @@
 package org.opendaylight.netconf.topology.singleton.impl.tx;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
 
 /**
  * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor}.
  */
-public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
+public class ProxyReadTransaction extends ProxyReadWriteTransaction implements DOMDataReadOnlyTransaction {
 
-    private final ProxyReadAdapter delegate;
-
-    /**
-     * Constructor for {@code ProxyReadTransaction}.
-     *
-     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
-     * @param id            device id
-     * @param actorSystem   system
-     * @param askTimeout    timeout
-     */
-    public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
-                                final Timeout askTimeout) {
-        delegate = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
+    public ProxyReadTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
+            final ExecutionContext executionContext, final Timeout askTimeout) {
+        super(id, masterTxActorFuture, executionContext, askTimeout);
     }
 
     @Override
     public void close() {
-        delegate.close();
-    }
-
-    @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
-                                                                                   final YangInstanceIdentifier path) {
-        return delegate.read(store, path);
-    }
-
-    @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
-                                                              final YangInstanceIdentifier path) {
-        return delegate.exists(store, path);
-    }
-
-
-    @Override
-    public Object getIdentifier() {
-        return this;
+        // noop
     }
 }
index 5cd241831252d965b25837ed1681b37ba9e8d8f8..82fbc41e05cc9b553c9859e6c59b1ea34d62a7fb 100644 (file)
 package org.opendaylight.netconf.topology.singleton.impl.tx;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
 
 /**
  * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
  */
 public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
 
-    private final ProxyReadAdapter delegateRead;
-    private final ProxyWriteAdapter delegateWrite;
-
-    /**
-     * Constructor for {@code ProxyReadWriteTransaction}.
-     *
-     * @param masterTxActor
-     * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor} ref
-     * @param id            device id
-     * @param actorSystem   system
-     * @param askTimeout    timeout
-     */
-    public ProxyReadWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id,
-                                     final ActorSystem actorSystem, final Timeout askTimeout) {
-        delegateRead = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
-        delegateWrite = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
+    private final RemoteDeviceId id;
+    private final AtomicBoolean opened = new AtomicBoolean(true);
+
+    @GuardedBy("queuedTxOperations")
+    private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
+
+    private volatile ProxyTransactionFacade transactionFacade;
+
+    public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
+            final ExecutionContext executionContext, final Timeout askTimeout) {
+        this.id = id;
+
+        masterTxActorFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object masterTxActor) {
+                final ProxyTransactionFacade newTransactionFacade;
+                if (failure != null) {
+                    LOG.debug("{}: Failed to obtain master actor", id, failure);
+                    newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
+                } else {
+                    LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
+                    newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
+                            executionContext, askTimeout);
+                }
+
+                executePriorTransactionOperations(newTransactionFacade);
+            }
+        }, executionContext);
     }
 
     @Override
     public boolean cancel() {
-        return delegateWrite.cancel();
+        if (!opened.compareAndSet(true, false)) {
+            return false;
+        }
+
+        processTransactionOperation(facade -> facade.cancel());
+        return true;
     }
 
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
-                                                                                   final YangInstanceIdentifier path) {
-        return delegateRead.read(store, path);
-    }
+            final YangInstanceIdentifier path) {
+        LOG.debug("{}: Read {} {}", id, store, path);
 
-    @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
-                                                              final YangInstanceIdentifier path) {
-        return delegateRead.exists(store, path);
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+        processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
+        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
-        delegateWrite.delete(store, path);
-    }
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+            final YangInstanceIdentifier path) {
+        LOG.debug("{}: Exists {} {}", id, store, path);
 
-    @Override
-    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
-            new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
-                @Override
-                protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
-                    return new TransactionCommitFailedException(message, cause);
-                }
-            });
+        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+        processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
+        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     @Override
-    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
-        return delegateWrite.commit(getIdentifier());
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        checkOpen();
+        LOG.debug("{}: Delete {} {}", id, store, path);
+        processTransactionOperation(facade -> facade.delete(store, path));
     }
 
     @Override
     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
                     final NormalizedNode<?, ?> data) {
-        delegateWrite.put(store, path, data, getIdentifier());
+        checkOpen();
+        LOG.debug("{}: Put {} {}", id, store, path);
+        processTransactionOperation(facade -> facade.put(store, path, data));
     }
 
     @Override
     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
                       final NormalizedNode<?, ?> data) {
-        delegateWrite.merge(store, path, data, getIdentifier());
+        checkOpen();
+        LOG.debug("{}: Merge {} {}", id, store, path);
+        processTransactionOperation(facade -> facade.merge(store, path, data));
+    }
+
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
+        LOG.debug("{}: Commit", id);
+
+        final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
+        processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
+        return returnFuture;
     }
 
     @Override
     public Object getIdentifier() {
-        return this;
+        return id;
+    }
+
+    private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
+        final ProxyTransactionFacade facadeOnEntry;
+        synchronized (queuedTxOperations) {
+            if (transactionFacade == null) {
+                LOG.debug("{}: Queuing transaction operation", id);
+
+                queuedTxOperations.add(operation);
+                facadeOnEntry = null;
+            }  else {
+                facadeOnEntry = transactionFacade;
+            }
+        }
+
+        if (facadeOnEntry != null) {
+            operation.accept(facadeOnEntry);
+        }
+    }
+
+    private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
+        while (true) {
+            // Access to queuedTxOperations and transactionFacade must be protected and atomic
+            // (ie synchronized) with respect to #processTransactionOperation to handle timing
+            // issues and ensure no ProxyTransactionFacade is missed and that they are processed
+            // in the order they occurred.
+
+            // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
+            // in case a transaction operation results in another transaction operation being
+            // queued (eg a put operation from a client read Future callback that is notified
+            // synchronously).
+            final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
+            synchronized (queuedTxOperations) {
+                if (queuedTxOperations.isEmpty()) {
+                    // We're done invoking the transaction operations so we can now publish the
+                    // ProxyTransactionFacade.
+                    transactionFacade = newTransactionFacade;
+                    break;
+                }
+
+                operationsBatch = new ArrayList<>(queuedTxOperations);
+                queuedTxOperations.clear();
+            }
+
+            // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
+            for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
+                oper.accept(newTransactionFacade);
+            }
+        }
+    }
+
+    private void checkOpen() {
+        Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);
     }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyTransactionFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyTransactionFacade.java
new file mode 100644 (file)
index 0000000..babac0d
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+
+/**
+ * Interfaces with a transaction back-end.
+ *
+ * @author Thomas Pantelis
+ */
+interface ProxyTransactionFacade extends DOMDataReadWriteTransaction {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java
deleted file mode 100644 (file)
index ccd58fb..0000000
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-
-public class ProxyWriteAdapter {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteAdapter.class);
-
-    private final ActorRef masterTxActor;
-    private final RemoteDeviceId id;
-    private final ActorSystem actorSystem;
-    private final AtomicBoolean opened = new AtomicBoolean(true);
-    private final Timeout askTimeout;
-
-    public ProxyWriteAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
-                             final Timeout askTimeout) {
-        this.masterTxActor = masterTxActor;
-        this.id = id;
-        this.actorSystem = actorSystem;
-        this.askTimeout = askTimeout;
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public boolean cancel() {
-        if (!opened.compareAndSet(true, false)) {
-            return false;
-        }
-        final Future<Object> cancelScalaFuture =
-                Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
-
-        LOG.trace("{}: Cancel {} via NETCONF", id);
-
-        try {
-            // here must be Await because AsyncWriteTransaction do not return future
-            return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
-        } catch (final Exception e) {
-            return false;
-        }
-    }
-
-    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit(final Object identifier) {
-        if (!opened.compareAndSet(true, false)) {
-            throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
-        }
-        final Future<Object> submitScalaFuture =
-                Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
-
-        LOG.trace("{}: Commit {} via NETCONF", id);
-
-        final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
-        submitScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    settableFuture.setException(newTransactionCommitFailedException(
-                            NetconfTopologyUtils.createMasterIsDownException(id), identifier));
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier));
-                } else {
-                    if (success instanceof SubmitFailedReply) {
-                        LOG.error("{}: Transaction was not submitted because already closed.", id);
-                        settableFuture.setException(newTransactionCommitFailedException(
-                                ((SubmitFailedReply) success).getThrowable(), identifier));
-                        return;
-                    }
-
-                    settableFuture.set(CommitInfo.empty());
-                }
-            }
-        }, actorSystem.dispatcher());
-
-        return FluentFuture.from(settableFuture);
-    }
-
-    private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure,
-            final Object identifier) {
-        return new TransactionCommitFailedException(
-                String.format("Commit of transaction %s failed", identifier), failure);
-    }
-
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
-        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
-        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
-        masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
-    }
-
-    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
-                    final NormalizedNode<?, ?> data, final Object identifier) {
-        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
-        final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
-        LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, path, data);
-        masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
-    }
-
-    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
-                      final NormalizedNode<?, ?> data, final Object identifier) {
-        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
-        final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
-        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
-        masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java
deleted file mode 100644 (file)
index d284023..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
- * {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor}.
- */
-public class ProxyWriteTransaction implements DOMDataWriteTransaction {
-
-    private final ProxyWriteAdapter proxyWriteAdapter;
-
-    /**
-     * Constructor for {@code ProxyWriteTransaction}.
-     *
-     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
-     * @param id            device id
-     * @param actorSystem   system
-     * @param askTimeout    timeout
-     */
-    public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
-                                 final Timeout askTimeout) {
-        proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    @Override
-    public boolean cancel() {
-        return proxyWriteAdapter.cancel();
-    }
-
-    @Override
-    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
-            new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
-                @Override
-                protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
-                    return new TransactionCommitFailedException(message, cause);
-                }
-            });
-    }
-
-    @Override
-    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
-        return proxyWriteAdapter.commit(getIdentifier());
-    }
-
-    @Override
-    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
-        proxyWriteAdapter.delete(store, identifier);
-    }
-
-    @Override
-    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
-                    final NormalizedNode<?, ?> data) {
-        proxyWriteAdapter.put(store, identifier, data, getIdentifier());
-    }
-
-    @Override
-    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
-                      final NormalizedNode<?, ?> data) {
-        proxyWriteAdapter.merge(store, identifier, data, getIdentifier());
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this;
-    }
-}
index 474619ee24d05d9ce4125ba3aaa99c532761c453..eb34152e937e75de09e00651849ba617a6fc6618 100644 (file)
@@ -232,8 +232,8 @@ public final class NetconfTopologyUtils {
         return createTopologyListPath(topologyId).child(Node.class);
     }
 
-    public static DocumentedException createMasterIsDownException(final RemoteDeviceId id) {
-        return new DocumentedException(id + ":Master is down. Please try again.",
+    public static DocumentedException createMasterIsDownException(final RemoteDeviceId id, final Exception cause) {
+        return new DocumentedException(id + ":Master is down. Please try again.", cause,
                 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
                 DocumentedException.ErrorSeverity.WARNING);
     }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadTransactionReply.java
deleted file mode 100644 (file)
index f2fcd0c..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import akka.actor.ActorRef;
-import java.io.Serializable;
-
-public class NewReadTransactionReply implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final ActorRef txActor;
-
-    public NewReadTransactionReply(final ActorRef txActor) {
-        this.txActor = txActor;
-    }
-
-    public ActorRef getTxActor() {
-        return txActor;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java
deleted file mode 100644 (file)
index 08e2930..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import akka.actor.ActorRef;
-import java.io.Serializable;
-
-public class NewReadWriteTransactionReply implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final ActorRef txActor;
-
-    public NewReadWriteTransactionReply(final ActorRef txActor) {
-        this.txActor = txActor;
-    }
-
-    public ActorRef getTxActor() {
-        return txActor;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewWriteTransactionReply.java
deleted file mode 100644 (file)
index 8d5e818..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import akka.actor.ActorRef;
-import java.io.Serializable;
-
-public class NewWriteTransactionReply implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final ActorRef txActor;
-
-    public NewWriteTransactionReply(final ActorRef txActor) {
-        this.txActor = txActor;
-    }
-
-    public ActorRef getTxActor() {
-        return txActor;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitFailedReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitFailedReply.java
deleted file mode 100644 (file)
index 7dfc19c..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import java.io.Serializable;
-
-/**
- * Message sent from master back to the slave when submit fails, with the offending exception attached.
- */
-public class SubmitFailedReply implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final Throwable throwable;
-
-    public SubmitFailedReply(final Throwable throwable) {
-        this.throwable = throwable;
-    }
-
-    public Throwable getThrowable() {
-        return throwable;
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java
deleted file mode 100644 (file)
index 5ca306c..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import java.io.Serializable;
-
-/**
- * Message sent from master back to the slave when submit is successfully performed.
- */
-public class SubmitReply implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBrokerTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBrokerTest.java
new file mode 100644 (file)
index 0000000..b1bc972
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.ActorSystem;
+import akka.actor.Status.Success;
+import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for ProxyDOMDataBroker.
+ *
+ * @author Thomas Pantelis
+ */
+public class ProxyDOMDataBrokerTest {
+    private static final RemoteDeviceId DEVICE_ID =
+            new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+
+    private static ActorSystem system = ActorSystem.apply();
+
+    private final TestProbe masterActor = new TestProbe(system);
+    private final ProxyDOMDataBroker proxy = new ProxyDOMDataBroker(DEVICE_ID, masterActor.ref(), system.dispatcher(),
+            Timeout.apply(5, TimeUnit.SECONDS));
+
+    @AfterClass
+    public static void staticTearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
+    }
+
+    @Test
+    public void testNewReadOnlyTransaction() {
+        final DOMDataReadOnlyTransaction tx = proxy.newReadOnlyTransaction();
+        masterActor.expectMsgClass(NewReadTransactionRequest.class);
+        masterActor.reply(new Success(masterActor.ref()));
+
+        assertEquals(DEVICE_ID, tx.getIdentifier());
+
+        tx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+        masterActor.expectMsgClass(ReadRequest.class);
+    }
+
+    @Test
+    public void testNewWriteOnlyTransaction() {
+        final DOMDataWriteTransaction tx = proxy.newWriteOnlyTransaction();
+        masterActor.expectMsgClass(NewWriteTransactionRequest.class);
+        masterActor.reply(new Success(masterActor.ref()));
+
+        assertEquals(DEVICE_ID, tx.getIdentifier());
+
+        tx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+        masterActor.expectMsgClass(DeleteRequest.class);
+    }
+
+    @Test
+    public void testNewReadWriteTransaction() {
+        final DOMDataReadWriteTransaction tx = proxy.newReadWriteTransaction();
+        masterActor.expectMsgClass(NewReadWriteTransactionRequest.class);
+        masterActor.reply(new Success(masterActor.ref()));
+
+        assertEquals(DEVICE_ID, tx.getIdentifier());
+
+        tx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+        masterActor.expectMsgClass(ReadRequest.class);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCreateTransactionChain() {
+        proxy.createTransactionChain(null);
+    }
+
+    @Test
+    public void testGetSupportedExtensions() {
+        assertTrue(proxy.getSupportedExtensions().isEmpty());
+    }
+}
index d41e6955f80fe55fff8aa7dd2404044dfce46880..d39a8ed8e3e17d253e6e272afec4350268d17cb0 100644 (file)
@@ -8,98 +8,29 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-import org.junit.After;
+import akka.testkit.javadsl.TestKit;
+import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-public class ReadTransactionActorTest {
 
-    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
-    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+public class ReadTransactionActorTest extends ReadTransactionActorTestAdapter {
+    private static ActorSystem system = ActorSystem.apply();
 
     @Mock
-    private DOMDataReadOnlyTransaction deviceReadTx;
-    private TestProbe probe;
-    private ActorSystem system;
-    private TestActorRef<ReadTransactionActor> actorRef;
+    private DOMDataReadOnlyTransaction mockReadTx;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
-        system = ActorSystem.apply();
-        probe = TestProbe.apply(system);
-        actorRef = TestActorRef.create(system, ReadTransactionActor.props(deviceReadTx), "testA");
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        final ContainerNode node = Builders.containerBuilder()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
-                .build();
-        when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
-        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
-        verify(deviceReadTx).read(STORE, PATH);
-        probe.expectMsgClass(NormalizedNodeMessage.class);
-    }
-
-    @Test
-    public void testReadEmpty() throws Exception {
-        when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
-        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
-        verify(deviceReadTx).read(STORE, PATH);
-        probe.expectMsgClass(EmptyReadResponse.class);
-    }
-
-    @Test
-    public void testReadFailure() throws Exception {
-        final ReadFailedException cause = new ReadFailedException("fail");
-        when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
-        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
-        verify(deviceReadTx).read(STORE, PATH);
-        probe.expectMsg(cause);
-    }
-
-    @Test
-    public void testExists() throws Exception {
-        when(deviceReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
-        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
-        verify(deviceReadTx).exists(STORE, PATH);
-        probe.expectMsg(true);
+        init(mockReadTx, system, TestActorRef.create(system, ReadTransactionActor.props(mockReadTx)));
     }
 
-    @Test
-    public void testExistsFailure() throws Exception {
-        final ReadFailedException cause = new ReadFailedException("fail");
-        when(deviceReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
-        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
-        verify(deviceReadTx).exists(STORE, PATH);
-        probe.expectMsg(cause);
+    @AfterClass
+    public static void staticTearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
     }
-}
\ No newline at end of file
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTestAdapter.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActorTestAdapter.java
new file mode 100644 (file)
index 0000000..3d7f2b7
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status.Failure;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+/**
+ * Adapter for read transaction tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class ReadTransactionActorTestAdapter {
+    static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+    static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+    static final NormalizedNode<?, ?> NODE = Builders.containerBuilder()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))).build();
+
+    private DOMDataReadTransaction mockReadTx;
+    private TestProbe probe;
+    private ActorRef actorRef;
+
+    public void init(DOMDataReadTransaction inMockReadTx, ActorSystem system, ActorRef inActorRef) {
+        this.mockReadTx = inMockReadTx;
+        this.probe = TestProbe.apply(system);
+        this.actorRef = inActorRef;
+    }
+
+    @Test
+    public void testRead() {
+        when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(NODE)));
+        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+
+        verify(mockReadTx).read(STORE, PATH);
+        final NormalizedNodeMessage response = probe.expectMsgClass(NormalizedNodeMessage.class);
+        assertEquals(NODE, response.getNode());
+    }
+
+    @Test
+    public void testReadEmpty() {
+        when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+
+        verify(mockReadTx).read(STORE, PATH);
+        probe.expectMsgClass(EmptyReadResponse.class);
+    }
+
+    @Test
+    public void testReadFailure() {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+
+        verify(mockReadTx).read(STORE, PATH);
+        final Failure response = probe.expectMsgClass(Failure.class);
+        assertEquals(cause, response.cause());
+    }
+
+    @Test
+    public void testExists() {
+        when(mockReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
+        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+
+        verify(mockReadTx).exists(STORE, PATH);
+        probe.expectMsg(true);
+    }
+
+    @Test
+    public void testExistsFailure() {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(mockReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+
+        verify(mockReadTx).exists(STORE, PATH);
+        final Failure response = probe.expectMsgClass(Failure.class);
+        assertEquals(cause, response.cause());
+    }
+}
index 2ccf8744497b6e590d23556e47f700fad48f33ed..dc4d81a8f6dd881a2c21651596bad8cade8f2378 100644 (file)
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
+import akka.testkit.javadsl.TestKit;
 import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
 public class ReadWriteTransactionActorTest {
-
-    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
-    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
-    private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+    private static ActorSystem system = ActorSystem.apply();
 
     @Mock
-    private DOMDataReadWriteTransaction deviceReadWriteTx;
-    private TestProbe probe;
-    private ActorSystem system;
-    private TestActorRef<WriteTransactionActor> actorRef;
-    private ContainerNode node;
+    private DOMDataReadWriteTransaction mockReadWriteTx;
+
+    private final ReadTransactionActorTestAdapter readTestAdapter = new ReadTransactionActorTestAdapter() {};
+    private final WriteTransactionActorTestAdapter writeTestAdapter = new WriteTransactionActorTestAdapter() {};
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
-        system = ActorSystem.apply();
-        probe = TestProbe.apply(system);
-        node = Builders.containerBuilder()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
-                .build();
-        actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(deviceReadWriteTx,
-                Duration.apply(2, TimeUnit.SECONDS)), "testA");
+        TestActorRef<?> actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(mockReadWriteTx,
+                Duration.apply(2, TimeUnit.SECONDS)));
+        readTestAdapter.init(mockReadWriteTx, system, actorRef);
+        writeTestAdapter.init(mockReadWriteTx, system, actorRef);
     }
 
-    @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system, null, true);
+    @AfterClass
+    public static void staticTearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
     }
 
     @Test
-    public void testRead() throws Exception {
-        when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
-        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
-        verify(deviceReadWriteTx).read(STORE, PATH);
-        probe.expectMsgClass(NormalizedNodeMessage.class);
+    public void testRead() {
+        readTestAdapter.testRead();
     }
 
     @Test
-    public void testReadEmpty() throws Exception {
-        when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
-        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
-        verify(deviceReadWriteTx).read(STORE, PATH);
-        probe.expectMsgClass(EmptyReadResponse.class);
+    public void testReadEmpty() {
+        readTestAdapter.testReadEmpty();
     }
 
     @Test
-    public void testReadFailure() throws Exception {
-        final ReadFailedException cause = new ReadFailedException("fail");
-        when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
-        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
-        verify(deviceReadWriteTx).read(STORE, PATH);
-        probe.expectMsg(cause);
+    public void testReadFailure() {
+        readTestAdapter.testReadFailure();
     }
 
     @Test
-    public void testExists() throws Exception {
-        when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
-        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
-        verify(deviceReadWriteTx).exists(STORE, PATH);
-        probe.expectMsg(true);
+    public void testExists() {
+        readTestAdapter.testExists();
     }
 
     @Test
-    public void testExistsFailure() throws Exception {
-        final ReadFailedException cause = new ReadFailedException("fail");
-        when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
-        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
-        verify(deviceReadWriteTx).exists(STORE, PATH);
-        probe.expectMsg(cause);
+    public void testExistsFailure() {
+        readTestAdapter.testExistsFailure();
     }
 
     @Test
-    public void testPut() throws Exception {
-        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
-        actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
-        verify(deviceReadWriteTx).put(STORE, PATH, node);
+    public void testPut() {
+        writeTestAdapter.testPut();
     }
 
     @Test
-    public void testMerge() throws Exception {
-        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
-        actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
-        verify(deviceReadWriteTx).merge(STORE, PATH, node);
+    public void testMerge() {
+        writeTestAdapter.testMerge();
     }
 
     @Test
-    public void testDelete() throws Exception {
-        actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
-        verify(deviceReadWriteTx).delete(STORE, PATH);
+    public void testDelete() {
+        writeTestAdapter.testDelete();
     }
 
     @Test
     public void testCancel() throws Exception {
-        when(deviceReadWriteTx.cancel()).thenReturn(true);
-        final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
-        final Object result = Await.result(cancelFuture, TIMEOUT.duration());
-        Assert.assertTrue(result instanceof Boolean);
-        verify(deviceReadWriteTx).cancel();
-        Assert.assertTrue((Boolean) result);
+        writeTestAdapter.testCancel();
     }
 
     @Test
     public void testSubmit() throws Exception {
-        when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
-        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
-        final Object result = Await.result(submitFuture, TIMEOUT.duration());
-        Assert.assertTrue(result instanceof SubmitReply);
-        verify(deviceReadWriteTx).submit();
+        writeTestAdapter.testSubmit();
     }
 
     @Test
     public void testSubmitFail() throws Exception {
-        final RpcError rpcError =
-                RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
-        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
-        when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
-        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
-        final Object result = Await.result(submitFuture, TIMEOUT.duration());
-        Assert.assertTrue(result instanceof SubmitFailedReply);
-        Assert.assertEquals(cause, ((SubmitFailedReply)result).getThrowable());
-        verify(deviceReadWriteTx).submit();
+        writeTestAdapter.testSubmitFail();
     }
 
     @Test
     public void testIdleTimeout() throws Exception {
-        final TestProbe testProbe = new TestProbe(system);
-        testProbe.watch(actorRef);
-        verify(deviceReadWriteTx, timeout(3000)).cancel();
-        testProbe.expectTerminated(actorRef, TIMEOUT.duration());
+        writeTestAdapter.testIdleTimeout();
     }
 }
index e21ec46e9c6cbb7dc9f6c5641729426a79729aaa..759c11209cfc1e3f5de56738816ca7fbe3be95f9 100644 (file)
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.Futures;
+import akka.testkit.javadsl.TestKit;
 import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
-public class WriteTransactionActorTest {
-    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
-    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
-    private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+public class WriteTransactionActorTest extends WriteTransactionActorTestAdapter {
+    private static ActorSystem system = ActorSystem.apply();
 
     @Mock
-    private DOMDataWriteTransaction deviceWriteTx;
-    private TestProbe probe;
-    private ActorSystem system;
-    private TestActorRef<WriteTransactionActor> actorRef;
-    private NormalizedNode<?, ?> node;
+    private DOMDataWriteTransaction mockWriteTx;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
-        system = ActorSystem.apply();
-        probe = TestProbe.apply(system);
-        node = Builders.containerBuilder()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
-                .build();
-        actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx,
-                Duration.apply(2, TimeUnit.SECONDS)), "testA");
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-    }
-
-    @Test
-    public void testPut() throws Exception {
-        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
-        actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
-        verify(deviceWriteTx).put(STORE, PATH, node);
-    }
-
-    @Test
-    public void testMerge() throws Exception {
-        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
-        actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
-        verify(deviceWriteTx).merge(STORE, PATH, node);
-    }
-
-    @Test
-    public void testDelete() throws Exception {
-        actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
-        verify(deviceWriteTx).delete(STORE, PATH);
-    }
-
-    @Test
-    public void testCancel() throws Exception {
-        when(deviceWriteTx.cancel()).thenReturn(true);
-        final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
-        final Object result = Await.result(cancelFuture, TIMEOUT.duration());
-        Assert.assertTrue(result instanceof Boolean);
-        verify(deviceWriteTx).cancel();
-        Assert.assertTrue((Boolean) result);
-    }
-
-    @Test
-    public void testSubmit() throws Exception {
-        when(deviceWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
-        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
-        final Object result = Await.result(submitFuture, TIMEOUT.duration());
-        Assert.assertTrue(result instanceof SubmitReply);
-        verify(deviceWriteTx).submit();
-    }
-
-    @Test
-    public void testSubmitFail() throws Exception {
-        final RpcError rpcError =
-                RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
-        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
-        when(deviceWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
-        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
-        final Object result = Await.result(submitFuture, TIMEOUT.duration());
-        Assert.assertTrue(result instanceof SubmitFailedReply);
-        Assert.assertEquals(cause, ((SubmitFailedReply)result).getThrowable());
-        verify(deviceWriteTx).submit();
+        init(mockWriteTx, system, TestActorRef.create(system,
+                WriteTransactionActor.props(mockWriteTx, Duration.apply(2, TimeUnit.SECONDS))));
     }
 
-    @Test
-    public void testIdleTimeout() throws Exception {
-        final TestProbe testProbe = new TestProbe(system);
-        testProbe.watch(actorRef);
-        verify(deviceWriteTx, timeout(3000)).cancel();
-        testProbe.expectTerminated(actorRef, TIMEOUT.duration());
+    @AfterClass
+    public static void staticTearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
     }
-
 }
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTestAdapter.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTestAdapter.java
new file mode 100644 (file)
index 0000000..14fc1d7
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.NODE;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.PATH;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.STORE;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.TIMEOUT;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.testkit.TestProbe;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * Adapter for write transaction tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class WriteTransactionActorTestAdapter {
+    private DOMDataWriteTransaction mockWriteTx;
+    private TestProbe probe;
+    private ActorRef actorRef;
+    private ActorSystem system;
+
+    public void init(DOMDataWriteTransaction inMockWriteTx, ActorSystem inSystem, ActorRef inActorRef) {
+        this.mockWriteTx = inMockWriteTx;
+        this.probe = TestProbe.apply(inSystem);
+        this.actorRef = inActorRef;
+        this.system = inSystem;
+    }
+
+    @Test
+    public void testPut() {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, NODE);
+        actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(mockWriteTx).put(STORE, PATH, NODE);
+    }
+
+    @Test
+    public void testMerge() {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, NODE);
+        actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(mockWriteTx).merge(STORE, PATH, NODE);
+    }
+
+    @Test
+    public void testDelete() {
+        actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+        verify(mockWriteTx).delete(STORE, PATH);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        when(mockWriteTx.cancel()).thenReturn(true);
+        actorRef.tell(new CancelRequest(), probe.ref());
+
+        verify(mockWriteTx).cancel();
+        probe.expectMsg(true);
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        when(mockWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+        actorRef.tell(new SubmitRequest(), probe.ref());
+
+        verify(mockWriteTx).submit();
+        probe.expectMsgClass(Success.class);
+    }
+
+    @Test
+    public void testSubmitFail() throws Exception {
+        final RpcError rpcError =
+                RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+        when(mockWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new SubmitRequest(), probe.ref());
+
+        verify(mockWriteTx).submit();
+        final Failure response = probe.expectMsgClass(Failure.class);
+        assertEquals(cause, response.cause());
+    }
+
+    @Test
+    public void testIdleTimeout() throws Exception {
+        final TestProbe testProbe = new TestProbe(system);
+        testProbe.watch(actorRef);
+        verify(mockWriteTx, timeout(3000)).cancel();
+        testProbe.expectTerminated(actorRef, TIMEOUT.duration());
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransactionTest.java
deleted file mode 100644 (file)
index 12adb7c..0000000
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.netconf.api.DocumentedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-public class ProxyReadTransactionTest {
-    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
-    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
-
-    private ActorSystem system;
-    private TestProbe masterActor;
-    private ContainerNode node;
-    private ProxyReadTransaction tx;
-
-    @Before
-    public void setUp() throws Exception {
-        system = ActorSystem.apply();
-        masterActor = new TestProbe(system);
-        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
-        node = Builders.containerBuilder()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
-                .build();
-        tx = new ProxyReadTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
-        masterActor.expectMsgClass(ReadRequest.class);
-        masterActor.reply(new NormalizedNodeMessage(PATH, node));
-        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
-        Assert.assertTrue(result.isPresent());
-        Assert.assertEquals(node, result.get());
-    }
-
-    @Test
-    public void testReadEmpty() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
-        masterActor.expectMsgClass(ReadRequest.class);
-        masterActor.reply(new EmptyReadResponse());
-        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
-        Assert.assertFalse(result.isPresent());
-    }
-
-    @Test(expected = ReadFailedException.class)
-    public void testReadFail() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
-        masterActor.expectMsgClass(ReadRequest.class);
-        masterActor.reply(new RuntimeException("fail"));
-        read.checkedGet();
-    }
-
-    @Test
-    public void testExists() throws Exception {
-        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
-        masterActor.expectMsgClass(ExistsRequest.class);
-        masterActor.reply(true);
-        final Boolean result = read.checkedGet();
-        Assert.assertTrue(result);
-    }
-
-    @Test(expected = ReadFailedException.class)
-    public void testExistsFail() throws Exception {
-        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
-        masterActor.expectMsgClass(ExistsRequest.class);
-        masterActor.reply(new RuntimeException("fail"));
-        read.checkedGet();
-    }
-
-    @Test
-    public void testMasterDownRead() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
-        masterActor.expectMsgClass(ReadRequest.class);
-        //master doesn't reply
-        try {
-            read.checkedGet();
-            Assert.fail("Exception should be thrown");
-        } catch (final ReadFailedException e) {
-            final Throwable cause = e.getCause();
-            Assert.assertTrue(cause instanceof DocumentedException);
-            final DocumentedException de = (DocumentedException) cause;
-            Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
-            Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
-            Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
-        }
-    }
-
-}
index 54b887cbb8319c7fe5f14a3ac4c36867bcc61db6..434be24a6d1d0cd658dd8e2be2a3db1a19bfe674 100644 (file)
@@ -8,18 +8,26 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.tx;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.dispatch.Futures;
+import akka.pattern.AskTimeoutException;
 import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import java.util.concurrent.TimeoutException;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -35,202 +43,366 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsR
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 public class ProxyReadWriteTransactionTest {
+    private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
+    private static final RemoteDeviceId DEVICE_ID =
+            new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
     private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
     private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
 
-    private ActorSystem system;
+    private static ActorSystem system = ActorSystem.apply();
     private TestProbe masterActor;
     private ContainerNode node;
-    private ProxyReadWriteTransaction tx;
 
     @Before
-    public void setUp() throws Exception {
-        system = ActorSystem.apply();
+    public void setUp() {
         masterActor = new TestProbe(system);
-        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
         node = Builders.containerBuilder()
                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
                 .build();
-        tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
     }
 
-    @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system, null, true);
+    @AfterClass
+    public static void staticTearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
+    }
+
+    private ProxyReadWriteTransaction newSuccessfulProxyTx() {
+        return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
+    }
+
+    private ProxyReadWriteTransaction newSuccessfulProxyTx(Timeout timeout) {
+        return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
+                system.dispatcher(), timeout);
     }
 
     @Test
-    public void testCancel() throws Exception {
-        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+    public void testCancel() {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        tx.cancel();
         masterActor.expectMsgClass(CancelRequest.class);
-        masterActor.reply(true);
-        Assert.assertTrue(submit.get());
+        masterActor.reply(Boolean.TRUE);
     }
 
     @Test
-    public void testCancelSubmitted() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submitFuture.checkedGet();
-        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
-        masterActor.expectNoMsg();
-        Assert.assertFalse(submit.get());
+    public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+        commit(tx);
     }
 
     @Test
-    public void testSubmit() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submitFuture.checkedGet();
+    public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+        commit(tx);
+        assertFalse(tx.cancel());
     }
 
     @Test
-    public void testDoubleSubmit() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submitFuture.checkedGet();
+    public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        commit(tx);
         try {
-            tx.submit().checkedGet();
-            Assert.fail("Should throw IllegalStateException");
+            tx.commit();
+            fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
+            masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
         }
     }
 
     @Test
-    public void testDelete() throws Exception {
+    public void testDelete() {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
         tx.delete(STORE, PATH);
-        masterActor.expectMsgClass(DeleteRequest.class);
+        final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
+        assertEquals(STORE, deleteRequest.getStore());
+        assertEquals(PATH, deleteRequest.getPath());
     }
 
     @Test
-    public void testDeleteClosed() throws Exception {
-        submit();
+    public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        commit(tx);
         try {
             tx.delete(STORE, PATH);
-            Assert.fail("Should throw IllegalStateException");
+            fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
+            masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
         }
     }
 
     @Test
-    public void testPut() throws Exception {
+    public void testPut() {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
         tx.put(STORE, PATH, node);
-        masterActor.expectMsgClass(PutRequest.class);
+        final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
+        assertEquals(STORE, putRequest.getStore());
+        assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
+        assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
     }
 
     @Test
-    public void testPutClosed() throws Exception {
-        submit();
+    public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        commit(tx);
         try {
             tx.put(STORE, PATH, node);
-            Assert.fail("Should throw IllegalStateException");
+            fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
+            masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
         }
     }
 
     @Test
-    public void testMerge() throws Exception {
+    public void testMerge() {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
         tx.merge(STORE, PATH, node);
-        masterActor.expectMsgClass(MergeRequest.class);
+        final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
+        assertEquals(STORE, mergeRequest.getStore());
+        assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
+        assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
     }
 
     @Test
-    public void testMergeClosed() throws Exception {
-        submit();
+    public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        commit(tx);
         try {
             tx.merge(STORE, PATH, node);
-            Assert.fail("Should throw IllegalStateException");
+            fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
+            masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
         }
     }
 
-    @Test
-    public void testGetIdentifier() throws Exception {
-        Assert.assertEquals(tx, tx.getIdentifier());
-    }
-
-    private void submit() throws TransactionCommitFailedException {
-        final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+    private void commit(ProxyReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final ListenableFuture<?> submit = tx.commit();
         masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submit.checkedGet();
+        masterActor.reply(new Success(null));
+        submit.get(5, TimeUnit.SECONDS);
     }
 
     @Test
     public void testRead() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
-        masterActor.expectMsgClass(ReadRequest.class);
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
+        final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
+        assertEquals(STORE, readRequest.getStore());
+        assertEquals(PATH, readRequest.getPath());
+
         masterActor.reply(new NormalizedNodeMessage(PATH, node));
-        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
-        Assert.assertTrue(result.isPresent());
-        Assert.assertEquals(node, result.get());
+        final Optional<NormalizedNode<?, ?>> result = read.get(5, TimeUnit.SECONDS);
+        assertTrue(result.isPresent());
+        assertEquals(node, result.get());
     }
 
     @Test
     public void testReadEmpty() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
         masterActor.expectMsgClass(ReadRequest.class);
         masterActor.reply(new EmptyReadResponse());
-        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
-        Assert.assertFalse(result.isPresent());
+        final Optional<NormalizedNode<?, ?>> result = read.get(5, TimeUnit.SECONDS);
+        assertFalse(result.isPresent());
     }
 
-    @Test(expected = ReadFailedException.class)
-    public void testReadFail() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+    @Test
+    public void testReadFailure() throws InterruptedException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
         masterActor.expectMsgClass(ReadRequest.class);
-        masterActor.reply(new RuntimeException("fail"));
-        read.checkedGet();
+        final RuntimeException mockEx = new RuntimeException("fail");
+        masterActor.reply(new Failure(mockEx));
+
+        try {
+            read.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+            assertEquals(mockEx, cause.getCause());
+        }
     }
 
     @Test
     public void testExists() throws Exception {
-        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
+        final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
+        assertEquals(STORE, existsRequest.getStore());
+        assertEquals(PATH, existsRequest.getPath());
+
+        masterActor.reply(Boolean.TRUE);
+        final Boolean result = read.get(5, TimeUnit.SECONDS);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testExistsFailure() throws InterruptedException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+        final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
         masterActor.expectMsgClass(ExistsRequest.class);
-        masterActor.reply(true);
-        final Boolean result = read.checkedGet();
-        Assert.assertTrue(result);
+        final RuntimeException mockEx = new RuntimeException("fail");
+        masterActor.reply(new Failure(mockEx));
+
+        try {
+            read.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+            assertEquals(mockEx, cause.getCause());
+        }
     }
 
-    @Test(expected = ReadFailedException.class)
-    public void testExistsFail() throws Exception {
-        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+    @Test
+    public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
+        ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
+
+        ListenableFuture<?> future = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+
+        // master doesn't reply
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+            verifyDocumentedException(cause.getCause());
+        }
+
+        future = tx.exists(STORE, PATH);
         masterActor.expectMsgClass(ExistsRequest.class);
-        masterActor.reply(new RuntimeException("fail"));
-        read.checkedGet();
+
+        // master doesn't reply
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+            verifyDocumentedException(cause.getCause());
+        }
+
+        future = tx.commit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+
+        // master doesn't reply
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
+            verifyDocumentedException(cause.getCause());
+        }
     }
 
     @Test
-    public void testMasterDownRead() throws Exception {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+    public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
+        final Promise<Object> promise = Futures.promise();
+        ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
+                system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
+
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
+        final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
+
+        tx.put(STORE, PATH, node);
+        tx.merge(STORE, PATH, node);
+        tx.delete(STORE, PATH);
+
+        final ListenableFuture<?> commit = tx.commit();
+
+        promise.success(masterActor.ref());
+
         masterActor.expectMsgClass(ReadRequest.class);
-        //master doesn't reply
+        masterActor.reply(new NormalizedNodeMessage(PATH, node));
+
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(Boolean.TRUE);
+
+        masterActor.expectMsgClass(PutRequest.class);
+        masterActor.expectMsgClass(MergeRequest.class);
+        masterActor.expectMsgClass(DeleteRequest.class);
+
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new Success(null));
+
+        read.get(5, TimeUnit.SECONDS).isPresent();
+        assertTrue(exists.get(5, TimeUnit.SECONDS));
+        commit.get(5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
+        final AskTimeoutException mockEx = new AskTimeoutException("mock");
+        ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
+                system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
+
+        ListenableFuture<?> future = tx.read(STORE, PATH);
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+            assertEquals(mockEx, cause.getCause());
+        }
+
+        future = tx.exists(STORE, PATH);
         try {
-            read.checkedGet();
-            Assert.fail("Exception should be thrown");
-        } catch (final ReadFailedException e) {
-            final Throwable cause = e.getCause();
-            Assert.assertTrue(cause instanceof DocumentedException);
-            final DocumentedException de = (DocumentedException) cause;
-            Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
-            Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
-            Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+            future.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+            assertEquals(mockEx, cause.getCause());
         }
+
+        tx.put(STORE, PATH, node);
+        tx.merge(STORE, PATH, node);
+        tx.delete(STORE, PATH);
+
+        future = tx.commit();
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Exception should be thrown");
+        } catch (final ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
+            assertEquals(mockEx, cause.getCause());
+        }
+    }
+
+    private void verifyDocumentedException(Throwable cause) {
+        assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
+        final DocumentedException de = (DocumentedException) cause;
+        assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+        assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+        assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
     }
 }
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransactionTest.java
deleted file mode 100644 (file)
index 159bdc1..0000000
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-public class ProxyWriteTransactionTest {
-    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
-    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
-
-    private ActorSystem system;
-    private TestProbe masterActor;
-    private ContainerNode node;
-    private ProxyWriteTransaction tx;
-
-    @Before
-    public void setUp() throws Exception {
-        system = ActorSystem.apply();
-        masterActor = new TestProbe(system);
-        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
-        node = Builders.containerBuilder()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
-                .build();
-        tx = new ProxyWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-    }
-
-    @Test
-    public void testCancel() throws Exception {
-        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
-        masterActor.expectMsgClass(CancelRequest.class);
-        masterActor.reply(true);
-        Assert.assertTrue(submit.get());
-    }
-
-    @Test
-    public void testCancelSubmitted() throws Exception {
-        final ListenableFuture<Void> submitFuture = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submitFuture.get();
-        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
-        masterActor.expectNoMsg();
-        Assert.assertFalse(submit.get());
-    }
-
-    @Test
-    public void testSubmit() throws Exception {
-        final ListenableFuture<Void> submitFuture = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submitFuture.get();
-    }
-
-    @Test
-    public void testDoubleSubmit() throws Exception {
-        final ListenableFuture<Void> submitFuture = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submitFuture.get();
-        try {
-            tx.submit().checkedGet();
-            Assert.fail("Should throw IllegalStateException");
-        } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
-        }
-    }
-
-    @Test
-    public void testDelete() throws Exception {
-        tx.delete(STORE, PATH);
-        masterActor.expectMsgClass(DeleteRequest.class);
-    }
-
-    @Test
-    public void testDeleteClosed() throws Exception {
-        submit();
-        try {
-            tx.delete(STORE, PATH);
-            Assert.fail("Should throw IllegalStateException");
-        } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
-        }
-    }
-
-    @Test
-    public void testPut() throws Exception {
-        tx.put(STORE, PATH, node);
-        masterActor.expectMsgClass(PutRequest.class);
-    }
-
-    @Test
-    public void testPutClosed() throws Exception {
-        submit();
-        try {
-            tx.put(STORE, PATH, node);
-            Assert.fail("Should throw IllegalStateException");
-        } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
-        }
-    }
-
-    @Test
-    public void testMerge() throws Exception {
-        tx.merge(STORE, PATH, node);
-        masterActor.expectMsgClass(MergeRequest.class);
-    }
-
-    @Test
-    public void testMergeClosed() throws Exception {
-        submit();
-        try {
-            tx.merge(STORE, PATH, node);
-            Assert.fail("Should throw IllegalStateException");
-        } catch (final IllegalStateException e) {
-            masterActor.expectNoMsg();
-        }
-    }
-
-    @Test
-    public void testGetIdentifier() throws Exception {
-        Assert.assertEquals(tx, tx.getIdentifier());
-    }
-
-    private void submit() throws TransactionCommitFailedException {
-        final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
-        masterActor.expectMsgClass(SubmitRequest.class);
-        masterActor.reply(new SubmitReply());
-        submit.checkedGet();
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java
deleted file mode 100644 (file)
index 825a812..0000000
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mock;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
-import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
-import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
-import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class ReadOnlyTransactionTest {
-    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
-    private static final int TIMEOUT_SEC = 5;
-    private static ActorSystem system;
-
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
-    private ActorRef masterRef;
-    private ProxyDOMDataBroker slaveDataBroker;
-    private List<SourceIdentifier> sourceIdentifiers;
-    private YangInstanceIdentifier instanceIdentifier;
-    private LogicalDatastoreType storeType;
-    @Mock
-    private DOMDataBroker deviceDataBroker;
-    @Mock
-    private DOMDataReadOnlyTransaction readTx;
-    @Mock
-    private DOMRpcService domRpcService;
-    @Mock
-    private DOMMountPointService mountPointService;
-
-
-    @Before
-    public void setup() throws Exception {
-        initMocks(this);
-
-        system = ActorSystem.create();
-
-        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
-                new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
-
-        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
-        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
-                DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
-
-        masterRef = TestActorRef.create(system, props, "master_read");
-
-        sourceIdentifiers = Lists.newArrayList();
-
-        //device read tx
-        doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
-
-        // Create slave data broker for testing proxy
-        slaveDataBroker =
-                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
-        initializeDataTest();
-        instanceIdentifier = YangInstanceIdentifier.EMPTY;
-        storeType = LogicalDatastoreType.CONFIGURATION;
-    }
-
-    @After
-    public void teardown() {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-        system = null;
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        // Message: NormalizedNodeMessage
-        final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
-                .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
-                Futures.immediateCheckedFuture(Optional.of(outputNode));
-        doReturn(resultNormalizedNodeMessage).when(readTx).read(storeType, instanceIdentifier);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
-                slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier);
-
-        final Optional<NormalizedNode<?, ?>> resultNodeMessage =
-                resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertTrue(resultNodeMessage.isPresent());
-        assertEquals(resultNodeMessage.get(), outputNode);
-    }
-
-    @Test
-    public void testReadEmpty() throws Exception {
-        // Message: EmptyReadResponse
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
-                Futures.immediateCheckedFuture(Optional.absent());
-        doReturn(resultEmpty).when(readTx).read(storeType, instanceIdentifier);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
-                slaveDataBroker.newReadOnlyTransaction().read(storeType,
-                        instanceIdentifier);
-
-        final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
-                resultEmptyResponse.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(resultEmptyMessage, Optional.absent());
-    }
-
-    @Test
-    public void testReadFail() throws Exception {
-        // Message: Throwable
-        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
-                Futures.immediateFailedCheckedFuture(readFailedException);
-
-        doReturn(resultThrowable).when(readTx).read(storeType, instanceIdentifier);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
-                slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier);
-
-        exception.expect(ReadFailedException.class);
-        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-    }
-
-    @Test
-    public void testExist() throws Exception {
-        // Message: True
-        final CheckedFuture<Boolean, ReadFailedException> resultTrue =
-                Futures.immediateCheckedFuture(true);
-        doReturn(resultTrue).when(readTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> trueResponse =
-                slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier);
-
-        final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(true, trueMessage);
-    }
-
-    @Test
-    public void testExistsNull() throws Exception {
-        // Message: False, result null
-        final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
-        doReturn(resultNull).when(readTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> nullResponse =
-                slaveDataBroker.newReadOnlyTransaction().exists(storeType,
-                        instanceIdentifier);
-
-        final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(false, nullFalseMessage);
-    }
-
-    @Test
-    public void testExistsFalse() throws Exception {
-        // Message: False
-        final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
-        doReturn(resultFalse).when(readTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> falseResponse =
-                slaveDataBroker.newReadOnlyTransaction().exists(storeType,
-                        instanceIdentifier);
-
-        final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(false, falseMessage);
-    }
-
-    @Test
-    public void testExistsFail() throws Exception {
-        // Message: Throwable
-        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
-        final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
-                Futures.immediateFailedCheckedFuture(readFailedException);
-        doReturn(resultThrowable).when(readTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
-                slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier);
-
-        exception.expect(ReadFailedException.class);
-        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-    }
-
-    private void initializeDataTest() throws Exception {
-        final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
-                                domRpcService), TIMEOUT);
-
-        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-
-        assertTrue(success instanceof MasterActorDataInitialized);
-    }
-}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java
deleted file mode 100644 (file)
index 441a4ca..0000000
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mock;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
-import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
-import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
-import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class ReadWriteTransactionTest {
-    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
-    private static final int TIMEOUT_SEC = 5;
-    private static ActorSystem system;
-
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
-    @Mock
-    private DOMDataBroker deviceDataBroker;
-    @Mock
-    private DOMDataReadWriteTransaction readWriteTx;
-    @Mock
-    private DOMRpcService domRpcService;
-    @Mock
-    private DOMMountPointService mountPointService;
-    private ActorRef masterRef;
-    private ProxyDOMDataBroker slaveDataBroker;
-    private List<SourceIdentifier> sourceIdentifiers;
-    private NormalizedNode<?, ?> testNode;
-    private YangInstanceIdentifier instanceIdentifier;
-    private LogicalDatastoreType storeType;
-
-    @Before
-    public void setup() throws Exception {
-        initMocks(this);
-
-        system = ActorSystem.create();
-
-        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
-                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
-
-        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
-        doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
-        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
-                DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
-
-        masterRef = TestActorRef.create(system, props, "master_read");
-
-        sourceIdentifiers = Lists.newArrayList();
-
-        doReturn(readWriteTx).when(deviceDataBroker).newReadWriteTransaction();
-        doNothing().when(readWriteTx).put(storeType, instanceIdentifier, testNode);
-        doNothing().when(readWriteTx).merge(storeType, instanceIdentifier, testNode);
-        doNothing().when(readWriteTx).delete(storeType, instanceIdentifier);
-
-        // Create slave data broker for testing proxy
-        slaveDataBroker =
-                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
-        initializeDataTest();
-        testNode = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
-                .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
-        instanceIdentifier = YangInstanceIdentifier.EMPTY;
-        storeType = LogicalDatastoreType.CONFIGURATION;
-    }
-
-    @After
-    public void teardown() {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-        system = null;
-    }
-
-    @Test
-    public void testPut() throws Exception {
-        // Test of invoking put on master through slave proxy
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        wTx.put(storeType, instanceIdentifier, testNode);
-
-        verify(readWriteTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
-
-        wTx.cancel();
-    }
-
-    @Test
-    public void testMerge() throws Exception {
-        // Test of invoking merge on master through slave proxy
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        wTx.merge(storeType, instanceIdentifier, testNode);
-
-        verify(readWriteTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
-
-        wTx.cancel();
-    }
-
-    @Test
-    public void testDelete() throws Exception {
-        // Test of invoking delete on master through slave proxy
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
-        wTx.cancel();
-
-        verify(readWriteTx, timeout(2000)).delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
-    }
-
-    @Test
-    public void testSubmit() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
-        doReturn(resultSubmit).when(readWriteTx).submit();
-
-        // Without Tx
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
-
-        final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertNull(result);
-    }
-
-    @Test
-    public void testSubmitWithOperation() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx =
-                Futures.immediateCheckedFuture(null);
-        doReturn(resultSubmitTx).when(readWriteTx).submit();
-        // With Tx
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION,
-                YangInstanceIdentifier.EMPTY);
-
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
-
-        final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertNull(resultTx);
-    }
-
-    @Test
-    public void testSubmitFail() throws Exception {
-        final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
-        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowable =
-                Futures.immediateFailedCheckedFuture(throwable);
-        doReturn(resultThrowable).when(readWriteTx).submit();
-
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION,
-                YangInstanceIdentifier.EMPTY);
-        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
-                wTx.submit();
-        exception.expect(TransactionCommitFailedException.class);
-        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-    }
-
-    @Test
-    public void testCancel() throws Exception {
-        doReturn(true).when(readWriteTx).cancel();
-
-        // Without Tx
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        final Boolean resultFalseNoTx = wTx.cancel();
-        assertEquals(true, resultFalseNoTx);
-    }
-
-    @Test
-    public void testCancelWithOperation() throws Exception {
-        doReturn(true).when(readWriteTx).cancel();
-
-        // With Tx, readWriteTx test
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION,
-                YangInstanceIdentifier.EMPTY);
-
-        final Boolean resultTrue = wTx.cancel();
-        assertEquals(true, resultTrue);
-
-        final Boolean resultFalse = wTx.cancel();
-        assertEquals(false, resultFalse);
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        // Message: NormalizedNodeMessage
-        final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
-                .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
-                Futures.immediateCheckedFuture(Optional.of(outputNode));
-        doReturn(resultNormalizedNodeMessage).when(readWriteTx).read(storeType, instanceIdentifier);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
-                slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
-
-        final Optional<NormalizedNode<?, ?>> resultNodeMessage =
-                resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertTrue(resultNodeMessage.isPresent());
-        assertEquals(resultNodeMessage.get(), outputNode);
-    }
-
-    @Test
-    public void testReadEmpty() throws Exception {
-        // Message: EmptyReadResponse
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
-                Futures.immediateCheckedFuture(Optional.absent());
-        doReturn(resultEmpty).when(readWriteTx).read(storeType, instanceIdentifier);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
-                slaveDataBroker.newReadWriteTransaction().read(storeType,
-                        instanceIdentifier);
-
-        final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
-                resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(resultEmptyMessage, Optional.absent());
-    }
-
-    @Test
-    public void testReadFail() throws Exception {
-        // Message: Throwable
-        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
-                Futures.immediateFailedCheckedFuture(readFailedException);
-
-        doReturn(resultThrowable).when(readWriteTx).read(storeType, instanceIdentifier);
-
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
-                slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
-
-        exception.expect(ReadFailedException.class);
-        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-    }
-
-    @Test
-    public void testExist() throws Exception {
-        // Message: True
-        final CheckedFuture<Boolean, ReadFailedException> resultTrue =
-                Futures.immediateCheckedFuture(true);
-        doReturn(resultTrue).when(readWriteTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> trueResponse =
-                slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
-
-        final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(true, trueMessage);
-    }
-
-    @Test
-    public void testExistsNull() throws Exception {
-        // Message: False, result null
-        final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
-        doReturn(resultNull).when(readWriteTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> nullResponse =
-                slaveDataBroker.newReadWriteTransaction().exists(storeType,
-                        instanceIdentifier);
-
-        final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(false, nullFalseMessage);
-    }
-
-    @Test
-    public void testExistsFalse() throws Exception {
-        // Message: False
-        final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
-        doReturn(resultFalse).when(readWriteTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> falseResponse =
-                slaveDataBroker.newReadWriteTransaction().exists(storeType,
-                        instanceIdentifier);
-
-        final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertEquals(false, falseMessage);
-    }
-
-    @Test
-    public void testExistsFail() throws Exception {
-        // Message: Throwable
-        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
-        final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
-                Futures.immediateFailedCheckedFuture(readFailedException);
-        doReturn(resultThrowable).when(readWriteTx).exists(storeType, instanceIdentifier);
-
-        final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
-                slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
-
-        exception.expect(ReadFailedException.class);
-        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-    }
-
-    private void initializeDataTest() throws Exception {
-        final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
-                        domRpcService), TIMEOUT);
-
-        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-
-        assertTrue(success instanceof MasterActorDataInitialized);
-    }
-
-}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java
deleted file mode 100644 (file)
index a13c953..0000000
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.collect.Lists;
-import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mock;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
-import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
-import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
-import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class WriteOnlyTransactionTest {
-    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
-    private static final int TIMEOUT_SEC = 5;
-    private static ActorSystem system;
-
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
-    @Mock
-    private DOMDataBroker deviceDataBroker;
-    @Mock
-    private DOMDataWriteTransaction writeTx;
-    @Mock
-    private DOMRpcService domRpcService;
-    @Mock
-    private DOMMountPointService mountPointService;
-    private ActorRef masterRef;
-    private ProxyDOMDataBroker slaveDataBroker;
-    private List<SourceIdentifier> sourceIdentifiers;
-    private NormalizedNode<?, ?> testNode;
-    private YangInstanceIdentifier instanceIdentifier;
-    private LogicalDatastoreType storeType;
-
-    @Before
-    public void setup() throws Exception {
-        initMocks(this);
-
-        system = ActorSystem.create();
-
-        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
-                new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
-
-        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
-        doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
-        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
-                DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
-
-        masterRef = TestActorRef.create(system, props, "master_read");
-
-        sourceIdentifiers = Lists.newArrayList();
-
-        final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class);
-
-        doReturn(writeTx).when(deviceDataBroker).newWriteOnlyTransaction();
-        doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
-        doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
-        doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
-        doNothing().when(writeTx).delete(storeType, instanceIdentifier);
-
-        // Create slave data broker for testing proxy
-        slaveDataBroker =
-                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
-        initializeDataTest();
-        testNode = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
-                .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
-        instanceIdentifier = YangInstanceIdentifier.EMPTY;
-        storeType = LogicalDatastoreType.CONFIGURATION;
-    }
-
-    @After
-    public void teardown() {
-        JavaTestKit.shutdownActorSystem(system, null, true);
-        system = null;
-    }
-
-    @Test
-    public void testPut() throws Exception {
-        // Test of invoking put on master through slave proxy
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        wTx.put(storeType, instanceIdentifier, testNode);
-
-        verify(writeTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
-
-        wTx.cancel();
-    }
-
-    @Test
-    public void testMerge() throws Exception {
-        // Test of invoking merge on master through slave proxy
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        wTx.merge(storeType, instanceIdentifier, testNode);
-
-        verify(writeTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
-
-        wTx.cancel();
-    }
-
-    @Test
-    public void testDelete() throws Exception {
-        // Test of invoking delete on master through slave proxy
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
-        wTx.cancel();
-
-        verify(writeTx, timeout(2000)).delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
-    }
-
-    @Test
-    public void testSubmit() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
-        doReturn(resultSubmit).when(writeTx).submit();
-
-        // Without Tx
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
-
-        final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertNull(result);
-    }
-
-    @Test
-    public void testSubmitWithOperation() throws Exception {
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx =
-                Futures.immediateCheckedFuture(null);
-        doReturn(resultSubmitTx).when(writeTx).submit();
-        // With Tx
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION,
-                YangInstanceIdentifier.EMPTY);
-
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
-
-        final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
-        assertNull(resultTx);
-    }
-
-    @Test
-    public void testSubmitFail() throws Exception {
-        final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
-        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowable =
-                Futures.immediateFailedCheckedFuture(throwable);
-        doReturn(resultThrowable).when(writeTx).submit();
-
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION,
-                YangInstanceIdentifier.EMPTY);
-        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
-                wTx.submit();
-        exception.expect(TransactionCommitFailedException.class);
-        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-    }
-
-    @Test
-    public void testCancel() throws Exception {
-        doReturn(true).when(writeTx).cancel();
-
-        // Without Tx
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        final Boolean resultFalseNoTx = wTx.cancel();
-        assertEquals(true, resultFalseNoTx);
-    }
-
-    @Test
-    public void testCancelWithOperation() throws Exception {
-        doReturn(true).when(writeTx).cancel();
-
-        // With Tx, readWriteTx test
-        final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.CONFIGURATION,
-                YangInstanceIdentifier.EMPTY);
-
-        final Boolean resultTrue = wTx.cancel();
-        assertEquals(true, resultTrue);
-
-        final Boolean resultFalse = wTx.cancel();
-        assertEquals(false, resultFalse);
-    }
-
-    private void initializeDataTest() throws Exception {
-        final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
-                                domRpcService), TIMEOUT);
-
-        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-
-        assertTrue(success instanceof MasterActorDataInitialized);
-    }
-
-}