Bug 8289 - 409 in cluster restperfclient test
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / WriteTransactionActor.java
index 008559ec9bfc39b4c0b930f07b5c836ec7bb50c2..8a1f53af5297a0772d4e8ca3f80656c1fbe532a4 100644 (file)
@@ -8,24 +8,12 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.actor.UntypedActor;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -39,11 +27,21 @@ public class WriteTransactionActor extends UntypedActor {
 
     private final DOMDataWriteTransaction tx;
     private final long idleTimeout;
+    private final WriteAdapter writeAdapter;
+
+    private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+        this.tx = tx;
+        this.idleTimeout = idleTimeout.toSeconds();
+        if (this.idleTimeout > 0) {
+            context().setReceiveTimeout(idleTimeout);
+        }
+        writeAdapter = new WriteAdapter(tx);
+    }
 
     /**
      * Creates new actor Props.
      *
-     * @param tx delegate device write transaction
+     * @param tx          delegate device write transaction
      * @param idleTimeout idle time in seconds, after which transaction is closed automatically
      * @return props
      */
@@ -51,31 +49,10 @@ public class WriteTransactionActor extends UntypedActor {
         return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
     }
 
-    private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
-        this.tx = tx;
-        this.idleTimeout = idleTimeout.toSeconds();
-        if (this.idleTimeout > 0) {
-            context().setReceiveTimeout(idleTimeout);
-        }
-    }
-
     @Override
     public void onReceive(final Object message) throws Throwable {
-        if (message instanceof MergeRequest) {
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
-            tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
-        } else if (message instanceof PutRequest) {
-            final PutRequest putRequest = (PutRequest) message;
-            final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
-            tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
-        } else if (message instanceof DeleteRequest) {
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
-        } else if (message instanceof CancelRequest) {
-            cancel();
-        } else if (message instanceof SubmitRequest) {
-            submit(sender(), self());
+        if (message instanceof WriteActorMessage) {
+            writeAdapter.handle(message, sender(), context(), self());
         } else if (message instanceof ReceiveTimeout) {
             LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
                     idleTimeout);
@@ -86,25 +63,5 @@ public class WriteTransactionActor extends UntypedActor {
         }
     }
 
-    private void cancel() {
-        final boolean cancelled = tx.cancel();
-        sender().tell(cancelled, self());
-        context().stop(self());
-    }
-
-    private void submit(final ActorRef requester, final ActorRef self) {
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-        context().stop(self);
-        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                requester.tell(new SubmitReply(), self);
-            }
 
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                requester.tell(throwable, self);
-            }
-        });
-    }
 }