Bug 8152: Transaction is already opened
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / WriteTransactionActor.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.UntypedActor;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
22 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
23 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
24 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
25 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.Duration;
32
33 /**
34  * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
35  */
36 public class WriteTransactionActor extends UntypedActor {
37
38     private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
39
40     private final DOMDataWriteTransaction tx;
41     private final long idleTimeout;
42
43     /**
44      * Creates new actor Props.
45      *
46      * @param tx delegate device write transaction
47      * @param idleTimeout idle time in seconds, after which transaction is closed automatically
48      * @return props
49      */
50     static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
51         return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
52     }
53
54     private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
55         this.tx = tx;
56         this.idleTimeout = idleTimeout.toSeconds();
57         if (this.idleTimeout > 0) {
58             context().setReceiveTimeout(idleTimeout);
59         }
60     }
61
62     @Override
63     public void onReceive(final Object message) throws Throwable {
64         if (message instanceof MergeRequest) {
65             final MergeRequest mergeRequest = (MergeRequest) message;
66             final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
67             tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
68         } else if (message instanceof PutRequest) {
69             final PutRequest putRequest = (PutRequest) message;
70             final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
71             tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
72         } else if (message instanceof DeleteRequest) {
73             final DeleteRequest deleteRequest = (DeleteRequest) message;
74             tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
75         } else if (message instanceof CancelRequest) {
76             cancel();
77         } else if (message instanceof SubmitRequest) {
78             submit(sender(), self());
79         } else if (message instanceof ReceiveTimeout) {
80             LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
81                     idleTimeout);
82             tx.cancel();
83             context().stop(self());
84         } else {
85             unhandled(message);
86         }
87     }
88
89     private void cancel() {
90         final boolean cancelled = tx.cancel();
91         sender().tell(cancelled, self());
92         context().stop(self());
93     }
94
95     private void submit(final ActorRef requester, final ActorRef self) {
96         final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
97         context().stop(self);
98         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
99             @Override
100             public void onSuccess(final Void result) {
101                 requester.tell(new SubmitReply(), self);
102             }
103
104             @Override
105             public void onFailure(@Nonnull final Throwable throwable) {
106                 requester.tell(throwable, self);
107             }
108         });
109     }
110 }