2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.UntypedActor;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
22 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
23 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
24 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
25 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.Duration;
34 * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
36 public class WriteTransactionActor extends UntypedActor {
38 private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
40 private final DOMDataWriteTransaction tx;
41 private final long idleTimeout;
44 * Creates new actor Props.
46 * @param tx delegate device write transaction
47 * @param idleTimeout idle time in seconds, after which transaction is closed automatically
50 static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
51 return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
54 private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
56 this.idleTimeout = idleTimeout.toSeconds();
57 if (this.idleTimeout > 0) {
58 context().setReceiveTimeout(idleTimeout);
63 public void onReceive(final Object message) throws Throwable {
64 if (message instanceof MergeRequest) {
65 final MergeRequest mergeRequest = (MergeRequest) message;
66 final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
67 tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
68 } else if (message instanceof PutRequest) {
69 final PutRequest putRequest = (PutRequest) message;
70 final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
71 tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
72 } else if (message instanceof DeleteRequest) {
73 final DeleteRequest deleteRequest = (DeleteRequest) message;
74 tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
75 } else if (message instanceof CancelRequest) {
77 } else if (message instanceof SubmitRequest) {
78 submit(sender(), self());
79 } else if (message instanceof ReceiveTimeout) {
80 LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
83 context().stop(self());
89 private void cancel() {
90 final boolean cancelled = tx.cancel();
91 sender().tell(cancelled, self());
92 context().stop(self());
95 private void submit(final ActorRef requester, final ActorRef self) {
96 final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
98 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
100 public void onSuccess(final Void result) {
101 requester.tell(new SubmitReply(), self);
105 public void onFailure(@Nonnull final Throwable throwable) {
106 requester.tell(throwable, self);