Adjust to yangtools-2.0.0 changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Preconditions;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30
31 /**
32  * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
33  */
34 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
35     private final ActorRef shardActor;
36     private final ShardStats shardStats;
37     private final TransactionIdentifier transactionId;
38
39     protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
40             final TransactionIdentifier transactionId) {
41         // actor name override used for metering. This does not change the "real" actor name
42         super("shard-tx");
43         this.shardActor = shardActor;
44         this.shardStats = shardStats;
45         this.transactionId = Preconditions.checkNotNull(transactionId);
46     }
47
48     public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
49             final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
50         return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
51     }
52
53     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
54
55     protected ActorRef getShardActor() {
56         return shardActor;
57     }
58
59     protected final TransactionIdentifier getTransactionId() {
60         return transactionId;
61     }
62
63     @Override
64     public void handleReceive(final Object message) {
65         if (CloseTransaction.isSerializedType(message)) {
66             closeTransaction(true);
67         } else if (message instanceof ReceiveTimeout) {
68             LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
69             closeTransaction(false);
70         } else {
71             unknownMessage(message);
72         }
73     }
74
75     protected boolean returnCloseTransactionReply() {
76         return true;
77     }
78
79     private void closeTransaction(final boolean sendReply) {
80         getDOMStoreTransaction().abortFromTransactionActor();
81         shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
82
83         if (sendReply && returnCloseTransactionReply()) {
84             getSender().tell(new CloseTransactionReply(), getSelf());
85         }
86
87         getSelf().tell(PoisonPill.getInstance(), getSelf());
88     }
89
90     private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
91         final boolean ret = transaction.isClosed();
92         if (ret) {
93             shardStats.incrementFailedReadTransactionsCount();
94             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")),
95                     getSelf());
96         }
97         return ret;
98     }
99
100     protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
101         if (checkClosed(transaction)) {
102             return;
103         }
104
105         final YangInstanceIdentifier path = message.getPath();
106         ReadDataReply readDataReply = new ReadDataReply(transaction.getSnapshot().readNode(path).orElse(null),
107             message.getVersion());
108         sender().tell(readDataReply.toSerializable(), self());
109     }
110
111     protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
112         if (checkClosed(transaction)) {
113             return;
114         }
115
116         final YangInstanceIdentifier path = message.getPath();
117         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
118         getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
119     }
120
121     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
122             + "create remote instances of this actor and thus don't need it to be Serializable.")
123     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
124
125         private static final long serialVersionUID = 1L;
126
127         final AbstractShardDataTreeTransaction<?> transaction;
128         final ActorRef shardActor;
129         final DatastoreContext datastoreContext;
130         final ShardStats shardStats;
131         final TransactionType type;
132
133         ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
134                 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
135             this.transaction = Preconditions.checkNotNull(transaction);
136             this.shardActor = shardActor;
137             this.shardStats = shardStats;
138             this.datastoreContext = datastoreContext;
139             this.type = type;
140         }
141
142         @Override
143         public ShardTransaction create() throws Exception {
144             final ShardTransaction tx;
145             switch (type) {
146                 case READ_ONLY:
147                     tx = new ShardReadTransaction(transaction, shardActor, shardStats);
148                     break;
149                 case READ_WRITE:
150                     tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
151                             shardStats);
152                     break;
153                 case WRITE_ONLY:
154                     tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
155                             shardStats);
156                     break;
157                 default:
158                     throw new IllegalArgumentException("Unhandled transaction type " + type);
159             }
160
161             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
162             return tx;
163         }
164     }
165 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.