Bug 8289 - 409 in cluster restperfclient test
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / WriteTransactionActor.java
index 5350b6478ed317e6be5e18e6b6f58a7a76435912..8a1f53af5297a0772d4e8ca3f80656c1fbe532a4 100644 (file)
@@ -8,86 +8,60 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
-import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
 import akka.actor.UntypedActor;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 /**
  * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
  */
 public class WriteTransactionActor extends UntypedActor {
 
+    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
+
     private final DOMDataWriteTransaction tx;
+    private final long idleTimeout;
+    private final WriteAdapter writeAdapter;
+
+    private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+        this.tx = tx;
+        this.idleTimeout = idleTimeout.toSeconds();
+        if (this.idleTimeout > 0) {
+            context().setReceiveTimeout(idleTimeout);
+        }
+        writeAdapter = new WriteAdapter(tx);
+    }
 
     /**
      * Creates new actor Props.
      *
-     * @param tx delegate device write transaction
+     * @param tx          delegate device write transaction
+     * @param idleTimeout idle time in seconds, after which transaction is closed automatically
      * @return props
      */
-    static Props props(final DOMDataWriteTransaction tx) {
-        return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
-    }
-
-    private WriteTransactionActor(final DOMDataWriteTransaction tx) {
-        this.tx = tx;
+    static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+        return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
     }
 
     @Override
     public void onReceive(final Object message) throws Throwable {
-        if (message instanceof MergeRequest) {
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
-            tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
-        } else if (message instanceof PutRequest) {
-            final PutRequest putRequest = (PutRequest) message;
-            final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
-            tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
-        } else if (message instanceof DeleteRequest) {
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
-        } else if (message instanceof CancelRequest) {
-            cancel();
-        } else if (message instanceof SubmitRequest) {
-            submit(sender(), self());
+        if (message instanceof WriteActorMessage) {
+            writeAdapter.handle(message, sender(), context(), self());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+                    idleTimeout);
+            tx.cancel();
+            context().stop(self());
         } else {
             unhandled(message);
         }
     }
 
-    private void cancel() {
-        final boolean cancelled = tx.cancel();
-        sender().tell(cancelled, self());
-        context().stop(self());
-    }
 
-    private void submit(final ActorRef requester, final ActorRef self) {
-        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-        context().stop(self);
-        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                requester.tell(new SubmitReply(), self);
-            }
-
-            @Override
-            public void onFailure(@Nonnull final Throwable throwable) {
-                requester.tell(throwable, self);
-            }
-        });
-    }
 }