Bug 8289 - 409 in cluster restperfclient test 56/56056/8
authormiroslav.kovac <miroslav.kovac@pantheon.tech>
Wed, 26 Apr 2017 09:18:06 +0000 (11:18 +0200)
committerMiroslav Kovac <miroslav.kovac@pantheon.tech>
Fri, 28 Apr 2017 07:27:00 +0000 (07:27 +0000)
To ensure message ordering we need ReadWriteTransactionActor. This
way we are sure that all reads and writes will be executed in
the order that they are sent.

Change-Id: I7a76f6b4d9e6e348ec0d58abe24e56dfeae66f24
Signed-off-by: miroslav.kovac <miroslav.kovac@pantheon.tech>
26 files changed:
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMDataBroker.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadTransactionActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java [new file with mode: 0644]

index 416d9bdfd2ac0902a1f256349114bb5c9081a258..d5c9c25a17366aa6887c67762245d89acf4101b8 100644 (file)
@@ -25,12 +25,14 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadWriteTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -49,7 +51,7 @@ public class ProxyDOMDataBroker implements DOMDataBroker {
      * @param actorSystem system
      * @param id          id
      * @param masterNode  {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
-     * @param askTimeout ask timeout
+     * @param askTimeout  ask timeout
      */
     public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
                               final ActorRef masterNode, final Timeout askTimeout) {
@@ -77,7 +79,18 @@ public class ProxyDOMDataBroker implements DOMDataBroker {
 
     @Override
     public DOMDataReadWriteTransaction newReadWriteTransaction() {
-        return new ReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction());
+        final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadWriteTransactionRequest(), askTimeout);
+        try {
+            final Object msg = Await.result(txActorFuture, askTimeout.duration());
+            if (msg instanceof Throwable) {
+                throw (Throwable) msg;
+            }
+            Preconditions.checkState(msg instanceof NewReadWriteTransactionReply);
+            final NewReadWriteTransactionReply reply = (NewReadWriteTransactionReply) msg;
+            return new ProxyReadWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+        } catch (final Throwable t) {
+            throw new IllegalStateException("Can't create ProxyReadTransaction", t);
+        }
     }
 
     @Override
index fbd0422b692c66129f336dbb0f38bd8440ef31fe..46eabd0c3fae848fcb8dae9d316b1771f6bd3d85 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProv
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
@@ -47,6 +48,8 @@ import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -146,6 +149,14 @@ public class NetconfNodeActor extends UntypedActor {
                 sender().tell(t, self());
             }
 
+        } else if (message instanceof NewReadWriteTransactionRequest) {
+            try {
+                final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
+                final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
+                sender().tell(new NewReadWriteTransactionReply(txActor), self());
+            } catch (final Throwable t) {
+                sender().tell(t, self());
+            }
         } else if (message instanceof InvokeRpcMessage) { // master
 
             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadAdapter.java
new file mode 100644 (file)
index 0000000..0ef08f0
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+class ReadAdapter {
+
+    private final DOMDataReadTransaction tx;
+
+    public ReadAdapter(final DOMDataReadTransaction tx) {
+        this.tx = tx;
+    }
+
+    public void handle(final Object message, final ActorRef sender, final ActorRef self) throws Throwable {
+        if (message instanceof ReadRequest) {
+
+            final ReadRequest readRequest = (ReadRequest) message;
+            final YangInstanceIdentifier path = readRequest.getPath();
+            final LogicalDatastoreType store = readRequest.getStore();
+            read(path, store, sender, self);
+
+        } else if (message instanceof ExistsRequest) {
+            final ExistsRequest readRequest = (ExistsRequest) message;
+            final YangInstanceIdentifier path = readRequest.getPath();
+            final LogicalDatastoreType store = readRequest.getStore();
+            exists(path, store, sender, self);
+        }
+    }
+
+    private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+                      final ActorRef self) {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
+        Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+            @Override
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+                if (!result.isPresent()) {
+                    sender.tell(new EmptyReadResponse(), self);
+                    return;
+                }
+                sender.tell(new NormalizedNodeMessage(path, result.get()), self);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, self);
+            }
+        });
+    }
+
+    private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+                        final ActorRef self) {
+        final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
+        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                if (result == null) {
+                    sender.tell(false, self);
+                } else {
+                    sender.tell(result, self);
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, self);
+            }
+        });
+    }
+}
index b6bf651ea277518e80753b77f177922fd94d69c1..777140cd4fc2974fdc4e9f6aad71d09a62ad3b7c 100644 (file)
@@ -8,30 +8,21 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadActorMessage;
 
 /**
  * ReadTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
  */
 public class ReadTransactionActor extends UntypedActor {
 
-    private final DOMDataReadOnlyTransaction tx;
+    private final ReadAdapter readAdapter;
+
+    private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
+        readAdapter = new ReadAdapter(tx);
+    }
 
     /**
      * Creates new actor Props.
@@ -43,68 +34,13 @@ public class ReadTransactionActor extends UntypedActor {
         return Props.create(ReadTransactionActor.class, () -> new ReadTransactionActor(tx));
     }
 
-    private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
-        this.tx = tx;
-    }
-
     @Override
     public void onReceive(final Object message) throws Throwable {
-        if (message instanceof ReadRequest) {
-
-            final ReadRequest readRequest = (ReadRequest) message;
-            final YangInstanceIdentifier path = readRequest.getPath();
-            final LogicalDatastoreType store = readRequest.getStore();
-            read(path, store, sender(), self());
-
-        } else if (message instanceof ExistsRequest) {
-            final ExistsRequest readRequest = (ExistsRequest) message;
-            final YangInstanceIdentifier path = readRequest.getPath();
-            final LogicalDatastoreType store = readRequest.getStore();
-            exists(path, store, sender(), self());
-
+        if (message instanceof ReadActorMessage) {
+            readAdapter.handle(message, sender(), self());
         } else {
             unhandled(message);
         }
     }
 
-    private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
-                      final ActorRef self) {
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
-        Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-
-            @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                if (!result.isPresent()) {
-                    sender.tell(new EmptyReadResponse(), self);
-                    return;
-                }
-                sender.tell(new NormalizedNodeMessage(path, result.get()), self);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                sender.tell(throwable, self);
-            }
-        });
-    }
-
-    private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
-                        final ActorRef self) {
-        final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
-        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                if (result == null) {
-                    sender.tell(false, self);
-                } else {
-                    sender.tell(result, self);
-                }
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                sender.tell(throwable, self);
-            }
-        });
-    }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActor.java
new file mode 100644 (file)
index 0000000..1c263c0
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
+import akka.actor.UntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadActorMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+public class ReadWriteTransactionActor extends UntypedActor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTransactionActor.class);
+
+    private final DOMDataReadWriteTransaction tx;
+    private final long idleTimeout;
+    private final ReadAdapter readAdapter;
+    private final WriteAdapter writeAdapter;
+
+    private ReadWriteTransactionActor(final DOMDataReadWriteTransaction tx, final Duration idleTimeout) {
+        this.tx = tx;
+        this.idleTimeout = idleTimeout.toSeconds();
+        if (this.idleTimeout > 0) {
+            context().setReceiveTimeout(idleTimeout);
+        }
+        readAdapter = new ReadAdapter(tx);
+        writeAdapter = new WriteAdapter(tx);
+    }
+
+    /**
+     * Creates new actor Props.
+     *
+     * @param tx          delegate device read write transaction
+     * @param idleTimeout idle time in seconds, after which transaction is closed automatically
+     * @return props
+     */
+    static Props props(final DOMDataReadWriteTransaction tx, final Duration idleTimeout) {
+        return Props.create(ReadWriteTransactionActor.class, () -> new ReadWriteTransactionActor(tx, idleTimeout));
+    }
+
+    @Override
+    public void onReceive(final Object message) throws Throwable {
+        if (message instanceof ReadActorMessage) {
+            readAdapter.handle(message, sender(), self());
+        } else if (message instanceof WriteActorMessage) {
+            writeAdapter.handle(message, sender(), context(), self());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+                    idleTimeout);
+            tx.cancel();
+            context().stop(self());
+        } else {
+            unhandled(message);
+        }
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteAdapter.java
new file mode 100644 (file)
index 0000000..7e762bf
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+
+class WriteAdapter {
+    private final DOMDataWriteTransaction tx;
+
+    public WriteAdapter(final DOMDataWriteTransaction tx) {
+        this.tx = tx;
+    }
+
+    private void cancel(final ActorContext context, final ActorRef sender, final ActorRef self) {
+        final boolean cancelled = tx.cancel();
+        sender.tell(cancelled, self);
+        context.stop(self);
+    }
+
+    private void submit(final ActorRef requester, final ActorRef self, final ActorContext context) {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        context.stop(self);
+        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                requester.tell(new SubmitReply(), self);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                requester.tell(throwable, self);
+            }
+        });
+    }
+
+    public void handle(final Object message, final ActorRef sender, final ActorContext context, final ActorRef self) {
+        if (message instanceof MergeRequest) {
+            final MergeRequest mergeRequest = (MergeRequest) message;
+            final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
+            tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
+        } else if (message instanceof PutRequest) {
+            final PutRequest putRequest = (PutRequest) message;
+            final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
+            tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
+        } else if (message instanceof DeleteRequest) {
+            final DeleteRequest deleteRequest = (DeleteRequest) message;
+            tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
+        } else if (message instanceof CancelRequest) {
+            cancel(context, sender, self);
+        } else if (message instanceof SubmitRequest) {
+            submit(sender, self, context);
+        }
+    }
+}
index 008559ec9bfc39b4c0b930f07b5c836ec7bb50c2..8a1f53af5297a0772d4e8ca3f80656c1fbe532a4 100644 (file)
@@ -8,24 +8,12 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.actor.UntypedActor;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -39,11 +27,21 @@ public class WriteTransactionActor extends UntypedActor {
 
     private final DOMDataWriteTransaction tx;
     private final long idleTimeout;
+    private final WriteAdapter writeAdapter;
+
+    private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+        this.tx = tx;
+        this.idleTimeout = idleTimeout.toSeconds();
+        if (this.idleTimeout > 0) {
+            context().setReceiveTimeout(idleTimeout);
+        }
+        writeAdapter = new WriteAdapter(tx);
+    }
 
     /**
      * Creates new actor Props.
      *
-     * @param tx delegate device write transaction
+     * @param tx          delegate device write transaction
      * @param idleTimeout idle time in seconds, after which transaction is closed automatically
      * @return props
      */
@@ -51,31 +49,10 @@ public class WriteTransactionActor extends UntypedActor {
         return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
     }
 
-    private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
-        this.tx = tx;
-        this.idleTimeout = idleTimeout.toSeconds();
-        if (this.idleTimeout > 0) {
-            context().setReceiveTimeout(idleTimeout);
-        }
-    }
-
     @Override
     public void onReceive(final Object message) throws Throwable {
-        if (message instanceof MergeRequest) {
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
-            tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
-        } else if (message instanceof PutRequest) {
-            final PutRequest putRequest = (PutRequest) message;
-            final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
-            tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
-        } else if (message instanceof DeleteRequest) {
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
-        } else if (message instanceof CancelRequest) {
-            cancel();
-        } else if (message instanceof SubmitRequest) {
-            submit(sender(), self());
+        if (message instanceof WriteActorMessage) {
+            writeAdapter.handle(message, sender(), context(), self());
         } else if (message instanceof ReceiveTimeout) {
             LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
                     idleTimeout);
@@ -86,25 +63,5 @@ public class WriteTransactionActor extends UntypedActor {
         }
     }
 
-    private void cancel() {
-        final boolean cancelled = tx.cancel();
-        sender().tell(cancelled, self());
-        context().stop(self());
-    }
-
-    private void submit(final ActorRef requester, final ActorRef self) {
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-        context().stop(self);
-        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                requester.tell(new SubmitReply(), self);
-            }
 
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                requester.tell(throwable, self);
-            }
-        });
-    }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadAdapter.java
new file mode 100644 (file)
index 0000000..b500e0d
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+class ProxyReadAdapter {
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyReadAdapter.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ActorSystem actorSystem;
+    private final Timeout askTimeout;
+
+    public ProxyReadAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+                            final Timeout askTimeout) {
+        this.masterTxActor = masterTxActor;
+        this.id = id;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    public void close() {
+        //noop
+    }
+
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+                                                                                   final YangInstanceIdentifier path) {
+        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
+
+        final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure,
+                                   final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) { // Error sended by master
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                if (success instanceof EmptyReadResponse) {
+                    settableFuture.set(Optional.absent());
+                    return;
+                }
+                if (success instanceof NormalizedNodeMessage) {
+                    final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
+                    settableFuture.set(Optional.of(data.getNode()));
+                }
+            }
+        }, actorSystem.dispatcher());
+        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+    }
+
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+                                                              final YangInstanceIdentifier path) {
+        final Future<Object> existsScalaFuture =
+                Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
+
+        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+        existsScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                settableFuture.set((Boolean) success);
+            }
+        }, actorSystem.dispatcher());
+        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+    }
+
+}
index 0756f33ddfbc445c64ed72c24fb4a3ccb21ac068..9ef11bc63d0b2f4361180e8dd719b889f174c2f1 100644 (file)
@@ -10,27 +10,15 @@ package org.opendaylight.netconf.topology.singleton.impl.tx;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 /**
  * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
@@ -38,12 +26,7 @@ import scala.concurrent.Future;
  */
 public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProxyReadTransaction.class);
-
-    private final ActorRef masterTxActor;
-    private final RemoteDeviceId id;
-    private final ActorSystem actorSystem;
-    private final Timeout askTimeout;
+    private final ProxyReadAdapter delegate;
 
     /**
      * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
@@ -53,75 +36,24 @@ public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
      */
     public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
                                 final Timeout askTimeout) {
-        this.masterTxActor = masterTxActor;
-        this.id = id;
-        this.actorSystem = actorSystem;
-        this.askTimeout = askTimeout;
+        delegate = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
     }
 
     @Override
     public void close() {
-        //noop
+        delegate.close();
     }
 
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
                                                                                    final YangInstanceIdentifier path) {
-        LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
-        final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
-        future.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure,
-                                   final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
-                    settableFuture.setException(exception);
-                    return;
-                }
-                if (success instanceof Throwable) { // Error sended by master
-                    settableFuture.setException((Throwable) success);
-                    return;
-                }
-                if (success instanceof EmptyReadResponse) {
-                    settableFuture.set(Optional.absent());
-                    return;
-                }
-                if (success instanceof NormalizedNodeMessage) {
-                    final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
-                    settableFuture.set(Optional.of(data.getNode()));
-                }
-            }
-        }, actorSystem.dispatcher());
-        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+        return delegate.read(store, path);
     }
 
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
                                                               final YangInstanceIdentifier path) {
-        final Future<Object> existsScalaFuture =
-                Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
-
-        LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
-        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
-        existsScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
-                    settableFuture.setException(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    settableFuture.setException((Throwable) success);
-                    return;
-                }
-                settableFuture.set((Boolean) success);
-            }
-        }, actorSystem.dispatcher());
-        return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+        return delegate.exists(store, path);
     }
 
 
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..2791389
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
+ */
+public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
+
+    private final ProxyReadAdapter delegateRead;
+    private final ProxyWriteAdapter delegateWrite;
+
+    /**
+     * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor} ref
+     * @param id            device id
+     * @param actorSystem   system
+     * @param askTimeout
+     */
+    public ProxyReadWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id,
+                                     final ActorSystem actorSystem, final Timeout askTimeout) {
+        delegateRead = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
+        delegateWrite = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
+    }
+
+    @Override
+    public boolean cancel() {
+        return delegateWrite.cancel();
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        return delegateWrite.commit(getIdentifier());
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+                                                                                   final YangInstanceIdentifier path) {
+        return delegateRead.read(store, path);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+                                                              final YangInstanceIdentifier path) {
+        return delegateRead.exists(store, path);
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        delegateWrite.delete(store, path);
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        return delegateWrite.submit(getIdentifier());
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                    final NormalizedNode<?, ?> data) {
+        delegateWrite.put(store, path, data, getIdentifier());
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                      final NormalizedNode<?, ?> data) {
+        delegateWrite.merge(store, path, data, getIdentifier());
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java
new file mode 100644 (file)
index 0000000..e9be9e7
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class ProxyWriteAdapter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteAdapter.class);
+
+    private final ActorRef masterTxActor;
+    private final RemoteDeviceId id;
+    private final ActorSystem actorSystem;
+    private final AtomicBoolean opened = new AtomicBoolean(true);
+    private final Timeout askTimeout;
+
+    public ProxyWriteAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+                             final Timeout askTimeout) {
+        this.masterTxActor = masterTxActor;
+        this.id = id;
+        this.actorSystem = actorSystem;
+        this.askTimeout = askTimeout;
+    }
+
+    public boolean cancel() {
+        if (!opened.compareAndSet(true, false)) {
+            return false;
+        }
+        final Future<Object> cancelScalaFuture =
+                Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+        LOG.trace("{}: Cancel {} via NETCONF", id);
+
+        try {
+            // here must be Await because AsyncWriteTransaction do not return future
+            return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
+        } catch (final Exception e) {
+            return false;
+        }
+    }
+
+    public CheckedFuture<Void, TransactionCommitFailedException> submit(final Object identifier) {
+        if (!opened.compareAndSet(true, false)) {
+            throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
+        }
+        final Future<Object> submitScalaFuture =
+                Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+        LOG.trace("{}: Submit {} via NETCONF", id);
+
+        final SettableFuture<Void> settableFuture = SettableFuture.create();
+        submitScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+                    settableFuture.setException(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                } else {
+                    if (success instanceof SubmitFailedReply) {
+                        LOG.error("{}: Transaction was not submitted because already closed.", id);
+                    }
+                    settableFuture.set(null);
+                }
+            }
+        }, actorSystem.dispatcher());
+
+        return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+            @Nullable
+            @Override
+            public TransactionCommitFailedException apply(@Nullable final Exception input) {
+                final String message = "Submit of transaction " + identifier + " failed";
+                return new TransactionCommitFailedException(message, input);
+            }
+        });
+    }
+
+    public ListenableFuture<RpcResult<TransactionStatus>> commit(final Object identifier) {
+        LOG.trace("{}: Commit", id);
+
+        final CheckedFuture<Void, TransactionCommitFailedException> submit = submit(identifier);
+        return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
+            @Nullable
+            @Override
+            public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
+                return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
+            }
+        });
+    }
+
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
+        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
+        masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+    }
+
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                    final NormalizedNode<?, ?> data, final Object identifier) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
+        final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
+        LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, path, data);
+        masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+    }
+
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                      final NormalizedNode<?, ?> data, final Object identifier) {
+        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
+        final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
+        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
+        masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+    }
+
+}
index 072f6227b4fc7768c0d3e3a945edcf02e97d3194..eb64eaf8860fee2c7747dc45310491f8dfebe748 100644 (file)
@@ -10,38 +10,17 @@ package org.opendaylight.netconf.topology.singleton.impl.tx;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
 import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
 import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 
 /**
  * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
@@ -49,13 +28,7 @@ import scala.concurrent.Future;
  */
 public class ProxyWriteTransaction implements DOMDataWriteTransaction {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class);
-
-    private final ActorRef masterTxActor;
-    private final RemoteDeviceId id;
-    private final ActorSystem actorSystem;
-    private final AtomicBoolean opened = new AtomicBoolean(true);
-    private final Timeout askTimeout;
+    private final ProxyWriteAdapter proxyWriteAdapter;
 
     /**
      * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
@@ -65,107 +38,39 @@ public class ProxyWriteTransaction implements DOMDataWriteTransaction {
      */
     public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
                                  final Timeout askTimeout) {
-        this.masterTxActor = masterTxActor;
-        this.id = id;
-        this.actorSystem = actorSystem;
-        this.askTimeout = askTimeout;
+        proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
     }
 
     @Override
     public boolean cancel() {
-        if (!opened.compareAndSet(true, false)) {
-            return false;
-        }
-        final Future<Object> cancelScalaFuture =
-                Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
-
-        LOG.trace("{}: Cancel {} via NETCONF", id);
-
-        try {
-            // here must be Await because AsyncWriteTransaction do not return future
-            return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
-        } catch (final Exception e) {
-            return false;
-        }
+        return proxyWriteAdapter.cancel();
     }
 
     @Override
     public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        if (!opened.compareAndSet(true, false)) {
-            throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed");
-        }
-        final Future<Object> submitScalaFuture =
-                Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
-
-        LOG.trace("{}: Submit {} via NETCONF", id);
-
-        final SettableFuture<Void> settableFuture = SettableFuture.create();
-        submitScalaFuture.onComplete(new OnComplete<Object>() {
-            @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
-                if (failure != null) { // ask timeout
-                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
-                    settableFuture.setException(exception);
-                    return;
-                }
-                if (success instanceof Throwable) {
-                    settableFuture.setException((Throwable) success);
-                } else {
-                    if (success instanceof SubmitFailedReply) {
-                        LOG.error("{}: Transaction was not submitted because already closed.", id);
-                    }
-                    settableFuture.set(null);
-                }
-            }
-        }, actorSystem.dispatcher());
-
-        return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
-            @Nullable
-            @Override
-            public TransactionCommitFailedException apply(@Nullable final Exception input) {
-                final String message = "Submit of transaction " + getIdentifier() + " failed";
-                return new TransactionCommitFailedException(message, input);
-            }
-        });
+        return proxyWriteAdapter.submit(getIdentifier());
     }
 
     @Override
     public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-        LOG.trace("{}: Commit", id);
-
-        final CheckedFuture<Void, TransactionCommitFailedException> submit = submit();
-        return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
-            @Nullable
-            @Override
-            public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
-                return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
-            }
-        });
+        return proxyWriteAdapter.commit(getIdentifier());
     }
 
     @Override
     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
-        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
-        LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
-        masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+        proxyWriteAdapter.delete(store, identifier);
     }
 
     @Override
     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
                     final NormalizedNode<?, ?> data) {
-        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
-        final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
-        LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data);
-        masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+        proxyWriteAdapter.put(store, identifier, data, getIdentifier());
     }
 
     @Override
     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
                       final NormalizedNode<?, ?> data) {
-        Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
-        final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
-        LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data);
-        masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+        proxyWriteAdapter.merge(store, identifier, data, getIdentifier());
     }
 
     @Override
index 7a73fe2e90e0fcecaffcd0a28290782ade7f32e5..72d8e9e84926f06862d076c4ed941c40d3ada33d 100644 (file)
@@ -8,6 +8,6 @@
 
 package org.opendaylight.netconf.topology.singleton.messages.transactions;
 
-public class CancelRequest implements TransactionRequest {
+public class CancelRequest implements WriteActorMessage {
     private static final long serialVersionUID = 1L;
 }
index 1548dc791ae4e639231a1c8ebe504e79fac3612e..5e46157e40e046268f3a86ae4930be3a0f073e7d 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class DeleteRequest implements TransactionRequest {
+public class DeleteRequest implements WriteActorMessage {
     private static final long serialVersionUID = 1L;
 
     private final LogicalDatastoreType store;
index b5fae5f02931f434babca317b12e22c33b77855d..9bce961fcd42935c5aaff699b402306bb9f1208a 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class ExistsRequest implements TransactionRequest {
+public class ExistsRequest implements ReadActorMessage {
     private static final long serialVersionUID = 1L;
 
     private final LogicalDatastoreType store;
index 8c03023654f295393e791819cdf20cb3cf22dcf7..536d52172d1d3f07590f40223bea23f1c2d2a1fe 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 
-public class MergeRequest implements TransactionRequest {
+public class MergeRequest implements WriteActorMessage {
     private static final long serialVersionUID = 1L;
 
     private final NormalizedNodeMessage data;
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionReply.java
new file mode 100644 (file)
index 0000000..141eccc
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewReadWriteTransactionReply implements Serializable {
+
+    private final ActorRef txActor;
+
+    public NewReadWriteTransactionReply(final ActorRef txActor) {
+        this.txActor = txActor;
+    }
+
+    public ActorRef getTxActor() {
+        return txActor;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/NewReadWriteTransactionRequest.java
new file mode 100644 (file)
index 0000000..9a9a585
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+public class NewReadWriteTransactionRequest implements Serializable {
+    private static final long serialVersionUID = 1L;
+}
index 41de9c22b7daf1a0ef434ad79633a4e780793d95..c9084b813eb7a7a97e4d2afa7574abe7c9a74942 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 
-public class PutRequest implements TransactionRequest {
+public class PutRequest implements WriteActorMessage {
     private static final long serialVersionUID = 1L;
 
     private final LogicalDatastoreType store;
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadActorMessage.java
new file mode 100644 (file)
index 0000000..88bae04
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+public interface ReadActorMessage extends TransactionRequest {
+}
index d950f285576bd8a1a3fcfda8c8eb13554ed1efbd..b64f46cd9177930d245f4ab537de9232a4240f06 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages.transactions;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class ReadRequest implements TransactionRequest {
+public class ReadRequest implements ReadActorMessage {
     private static final long serialVersionUID = 1L;
 
     private final LogicalDatastoreType store;
index 6b6af7e77310cfee9b63a98cc0d797f595156016..64da32a0fe2ec0fb4e26ecf403a2f17c03cd8627 100644 (file)
@@ -8,6 +8,6 @@
 
 package org.opendaylight.netconf.topology.singleton.messages.transactions;
 
-public class SubmitRequest implements TransactionRequest {
+public class SubmitRequest implements WriteActorMessage {
     private static final long serialVersionUID = 1L;
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/WriteActorMessage.java
new file mode 100644 (file)
index 0000000..07af556
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+public interface WriteActorMessage extends TransactionRequest {
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/ReadWriteTransactionActorTest.java
new file mode 100644 (file)
index 0000000..08b755e
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class ReadWriteTransactionActorTest {
+
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+    private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+
+    @Mock
+    private DOMDataReadWriteTransaction deviceReadWriteTx;
+    private TestProbe probe;
+    private ActorSystem system;
+    private TestActorRef<WriteTransactionActor> actorRef;
+    private NormalizedNode<?, ?> node;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.apply();
+        probe = TestProbe.apply(system);
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(deviceReadWriteTx,
+                Duration.apply(2, TimeUnit.SECONDS)), "testA");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final ContainerNode node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
+        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+        verify(deviceReadWriteTx).read(STORE, PATH);
+        probe.expectMsgClass(NormalizedNodeMessage.class);
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+        verify(deviceReadWriteTx).read(STORE, PATH);
+        probe.expectMsgClass(EmptyReadResponse.class);
+    }
+
+    @Test
+    public void testReadFailure() throws Exception {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+        verify(deviceReadWriteTx).read(STORE, PATH);
+        probe.expectMsg(cause);
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
+        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+        verify(deviceReadWriteTx).exists(STORE, PATH);
+        probe.expectMsg(true);
+    }
+
+    @Test
+    public void testExistsFailure() throws Exception {
+        final ReadFailedException cause = new ReadFailedException("fail");
+        when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+        verify(deviceReadWriteTx).exists(STORE, PATH);
+        probe.expectMsg(cause);
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+        actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(deviceReadWriteTx).put(STORE, PATH, node);
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+        actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+        verify(deviceReadWriteTx).merge(STORE, PATH, node);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+        verify(deviceReadWriteTx).delete(STORE, PATH);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        when(deviceReadWriteTx.cancel()).thenReturn(true);
+        final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
+        final Object result = Await.result(cancelFuture, TIMEOUT.duration());
+        Preconditions.checkState(result instanceof Boolean);
+        verify(deviceReadWriteTx).cancel();
+        Assert.assertTrue((Boolean) result);
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+        final Object result = Await.result(submitFuture, TIMEOUT.duration());
+        Assert.assertTrue(result instanceof SubmitReply);
+        verify(deviceReadWriteTx).submit();
+    }
+
+    @Test
+    public void testSubmitFail() throws Exception {
+        final RpcError rpcError =
+                RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+        final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+        when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+        final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+        final Object result = Await.result(submitFuture, TIMEOUT.duration());
+        Assert.assertEquals(cause, result);
+        verify(deviceReadWriteTx).submit();
+    }
+
+    @Test
+    public void testIdleTimeout() throws Exception {
+        final TestProbe probe = new TestProbe(system);
+        probe.watch(actorRef);
+        verify(deviceReadWriteTx, timeout(3000)).cancel();
+        probe.expectTerminated(actorRef, TIMEOUT.duration());
+    }
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransactionTest.java
new file mode 100644 (file)
index 0000000..a24f2b1
--- /dev/null
@@ -0,0 +1,247 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyReadWriteTransactionTest {
+    private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+    private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+    private ActorSystem system;
+    private TestProbe masterActor;
+    private ContainerNode node;
+    private ProxyReadWriteTransaction tx;
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.apply();
+        masterActor = new TestProbe(system);
+        final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+        node = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+                .build();
+        tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+        masterActor.expectMsgClass(CancelRequest.class);
+        masterActor.reply(true);
+        Assert.assertTrue(submit.get());
+    }
+
+    @Test
+    public void testCancelSubmitted() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+        final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+        masterActor.expectNoMsg();
+        Assert.assertFalse(submit.get());
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+    }
+
+    @Test
+    public void testDoubleSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submitFuture.checkedGet();
+        try {
+            tx.submit().checkedGet();
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        final ListenableFuture<RpcResult<TransactionStatus>> submitFuture = tx.commit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult());
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        tx.delete(STORE, PATH);
+        masterActor.expectMsgClass(DeleteRequest.class);
+    }
+
+    @Test
+    public void testDeleteClosed() throws Exception {
+        submit();
+        try {
+            tx.delete(STORE, PATH);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        tx.put(STORE, PATH, node);
+        masterActor.expectMsgClass(PutRequest.class);
+    }
+
+    @Test
+    public void testPutClosed() throws Exception {
+        submit();
+        try {
+            tx.put(STORE, PATH, node);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        tx.merge(STORE, PATH, node);
+        masterActor.expectMsgClass(MergeRequest.class);
+    }
+
+    @Test
+    public void testMergeClosed() throws Exception {
+        submit();
+        try {
+            tx.merge(STORE, PATH, node);
+            Assert.fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            masterActor.expectNoMsg();
+        }
+    }
+
+    @Test
+    public void testGetIdentifier() throws Exception {
+        Assert.assertEquals(tx, tx.getIdentifier());
+    }
+
+    private void submit() throws TransactionCommitFailedException {
+        final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+        masterActor.expectMsgClass(SubmitRequest.class);
+        masterActor.reply(new SubmitReply());
+        submit.checkedGet();
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new NormalizedNodeMessage(PATH, node));
+        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(node, result.get());
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new EmptyReadResponse());
+        final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+        Assert.assertFalse(result.isPresent());
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testReadFail() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        masterActor.reply(new RuntimeException("fail"));
+        read.checkedGet();
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(true);
+        final Boolean result = read.checkedGet();
+        Assert.assertTrue(result);
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testExistsFail() throws Exception {
+        final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+        masterActor.expectMsgClass(ExistsRequest.class);
+        masterActor.reply(new RuntimeException("fail"));
+        read.checkedGet();
+    }
+
+    @Test
+    public void testMasterDownRead() throws Exception {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+        masterActor.expectMsgClass(ReadRequest.class);
+        //master doesn't reply
+        try {
+            read.checkedGet();
+            Assert.fail("Exception should be thrown");
+        } catch (final ReadFailedException e) {
+            final Throwable cause = e.getCause();
+            Assert.assertTrue(cause instanceof DocumentedException);
+            final DocumentedException de = (DocumentedException) cause;
+            Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+            Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+            Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+        }
+    }
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadWriteTransactionTest.java
new file mode 100644 (file)
index 0000000..ce6bf3e
--- /dev/null
@@ -0,0 +1,358 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import static junit.framework.TestCase.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class ReadWriteTransactionTest {
+    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
+    private static final int TIMEOUT_SEC = 5;
+    private static ActorSystem system;
+
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    @Mock
+    private DOMDataBroker deviceDataBroker;
+    @Mock
+    private DOMDataReadWriteTransaction readWriteTx;
+    @Mock
+    private DOMRpcService domRpcService;
+    private ActorRef masterRef;
+    private ProxyDOMDataBroker slaveDataBroker;
+    private List<SourceIdentifier> sourceIdentifiers;
+    private NormalizedNode<?, ?> testNode;
+    private YangInstanceIdentifier instanceIdentifier;
+    private LogicalDatastoreType storeType;
+
+    @Before
+    public void setup() throws Exception {
+        initMocks(this);
+
+        system = ActorSystem.create();
+
+        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
+                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+        doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
+        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
+                DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
+
+        masterRef = TestActorRef.create(system, props, "master_read");
+
+        sourceIdentifiers = Lists.newArrayList();
+
+        doReturn(readWriteTx).when(deviceDataBroker).newReadWriteTransaction();
+        doNothing().when(readWriteTx).put(storeType, instanceIdentifier, testNode);
+        doNothing().when(readWriteTx).merge(storeType, instanceIdentifier, testNode);
+        doNothing().when(readWriteTx).delete(storeType, instanceIdentifier);
+
+        // Create slave data broker for testing proxy
+        slaveDataBroker =
+                new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
+        initializeDataTest();
+        testNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+        instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        storeType = LogicalDatastoreType.CONFIGURATION;
+    }
+
+    @After
+    public void teardown() {
+        JavaTestKit.shutdownActorSystem(system, null, true);
+        system = null;
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        // Test of invoking put on master through slave proxy
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        wTx.put(storeType, instanceIdentifier, testNode);
+
+        verify(readWriteTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
+
+        wTx.cancel();
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        // Test of invoking merge on master through slave proxy
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        wTx.merge(storeType, instanceIdentifier, testNode);
+
+        verify(readWriteTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
+
+        wTx.cancel();
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+
+        // Test of invoking delete on master through slave proxy
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        wTx.delete(storeType, instanceIdentifier);
+        wTx.cancel();
+
+        verify(readWriteTx, timeout(2000)).delete(storeType, instanceIdentifier);
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
+        doReturn(resultSubmit).when(readWriteTx).submit();
+
+        // Without Tx
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
+
+        final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertNull(result);
+    }
+
+    @Test
+    public void testSubmitWithOperation() throws Exception {
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
+        doReturn(resultSubmitTx).when(readWriteTx).submit();
+        // With Tx
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        wTx.delete(LogicalDatastoreType.CONFIGURATION,
+                YangInstanceIdentifier.EMPTY);
+
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
+
+        final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertNull(resultTx);
+    }
+
+    @Test
+    public void testSubmitFail() throws Exception {
+        final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
+        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowable =
+                Futures.immediateFailedCheckedFuture(throwable);
+        doReturn(resultThrowable).when(readWriteTx).submit();
+
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        wTx.delete(LogicalDatastoreType.CONFIGURATION,
+                YangInstanceIdentifier.EMPTY);
+        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
+                wTx.submit();
+        exception.expect(TransactionCommitFailedException.class);
+        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        doReturn(true).when(readWriteTx).cancel();
+
+        // Without Tx
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        final Boolean resultFalseNoTx = wTx.cancel();
+        assertEquals(true, resultFalseNoTx);
+    }
+
+    @Test
+    public void testCancelWithOperation() throws Exception {
+        doReturn(true).when(readWriteTx).cancel();
+
+        // With Tx, readWriteTx test
+        final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+        wTx.delete(LogicalDatastoreType.CONFIGURATION,
+                YangInstanceIdentifier.EMPTY);
+
+        final Boolean resultTrue = wTx.cancel();
+        assertEquals(true, resultTrue);
+
+        final Boolean resultFalse = wTx.cancel();
+        assertEquals(false, resultFalse);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        // Message: NormalizedNodeMessage
+        final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
+                Futures.immediateCheckedFuture(Optional.of(outputNode));
+        doReturn(resultNormalizedNodeMessage).when(readWriteTx).read(storeType, instanceIdentifier);
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
+                slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
+
+        final Optional<NormalizedNode<?, ?>> resultNodeMessage =
+                resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertTrue(resultNodeMessage.isPresent());
+        assertEquals(resultNodeMessage.get(), outputNode);
+    }
+
+    @Test
+    public void testReadEmpty() throws Exception {
+        // Message: EmptyReadResponse
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
+                Futures.immediateCheckedFuture(Optional.absent());
+        doReturn(resultEmpty).when(readWriteTx).read(storeType, instanceIdentifier);
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
+                slaveDataBroker.newReadWriteTransaction().read(storeType,
+                        instanceIdentifier);
+
+        final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
+                resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(resultEmptyMessage, Optional.absent());
+    }
+
+    @Test
+    public void testReadFail() throws Exception {
+        // Message: Throwable
+        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
+                Futures.immediateFailedCheckedFuture(readFailedException);
+
+        doReturn(resultThrowable).when(readWriteTx).read(storeType, instanceIdentifier);
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
+                slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
+
+        exception.expect(ReadFailedException.class);
+        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testExist() throws Exception {
+        // Message: True
+        final CheckedFuture<Boolean, ReadFailedException> resultTrue =
+                Futures.immediateCheckedFuture(true);
+        doReturn(resultTrue).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> trueResponse =
+                slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
+
+        final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(true, trueMessage);
+    }
+
+    @Test
+    public void testExistsNull() throws Exception {
+        // Message: False, result null
+        final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
+        doReturn(resultNull).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> nullResponse =
+                slaveDataBroker.newReadWriteTransaction().exists(storeType,
+                        instanceIdentifier);
+
+        final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(false, nullFalseMessage);
+    }
+
+    @Test
+    public void testExistsFalse() throws Exception {
+        // Message: False
+        final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
+        doReturn(resultFalse).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> falseResponse =
+                slaveDataBroker.newReadWriteTransaction().exists(storeType,
+                        instanceIdentifier);
+
+        final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(false, falseMessage);
+    }
+
+    @Test
+    public void testExistsFail() throws Exception {
+        // Message: Throwable
+        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
+        final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
+                Futures.immediateFailedCheckedFuture(readFailedException);
+        doReturn(resultThrowable).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
+                slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
+
+        exception.expect(ReadFailedException.class);
+        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+    }
+
+    private void initializeDataTest() throws Exception {
+        final Future<Object> initialDataToActor =
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
+                        domRpcService), TIMEOUT);
+
+        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
+
+        assertTrue(success instanceof MasterActorDataInitialized);
+    }
+
+}