97426402bc08e7dca32328d852675a03ad08d7f9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.actor.ReceiveTimeout;
16 import akka.japi.Creator;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.mdsal.common.api.ReadFailedException;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29
30 /**
31  * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
32  */
33 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
34     private final ActorRef shardActor;
35     private final ShardStats shardStats;
36     private final TransactionIdentifier transactionId;
37
38     protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
39             final TransactionIdentifier transactionId) {
40         // actor name override used for metering. This does not change the "real" actor name
41         super("shard-tx");
42         this.shardActor = shardActor;
43         this.shardStats = shardStats;
44         this.transactionId = requireNonNull(transactionId);
45     }
46
47     public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
48             final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
49         return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
50     }
51
52     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
53
54     protected ActorRef getShardActor() {
55         return shardActor;
56     }
57
58     protected final TransactionIdentifier getTransactionId() {
59         return transactionId;
60     }
61
62     @Override
63     public void handleReceive(final Object message) {
64         if (CloseTransaction.isSerializedType(message)) {
65             closeTransaction(true);
66         } else if (message instanceof ReceiveTimeout) {
67             LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
68             closeTransaction(false);
69         } else {
70             unknownMessage(message);
71         }
72     }
73
74     protected boolean returnCloseTransactionReply() {
75         return true;
76     }
77
78     private void closeTransaction(final boolean sendReply) {
79         getDOMStoreTransaction().abortFromTransactionActor();
80
81         if (sendReply && returnCloseTransactionReply()) {
82             getSender().tell(new CloseTransactionReply(), getSelf());
83         }
84
85         getSelf().tell(PoisonPill.getInstance(), getSelf());
86     }
87
88     private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
89         final boolean ret = transaction.isClosed();
90         if (ret) {
91             shardStats.incrementFailedReadTransactionsCount();
92             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")),
93                     getSelf());
94         }
95         return ret;
96     }
97
98     protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
99         if (checkClosed(transaction)) {
100             return;
101         }
102
103         final YangInstanceIdentifier path = message.getPath();
104         ReadDataReply readDataReply = new ReadDataReply(transaction.getSnapshot().readNode(path).orElse(null),
105             message.getVersion());
106         sender().tell(readDataReply.toSerializable(), self());
107     }
108
109     protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
110         if (checkClosed(transaction)) {
111             return;
112         }
113
114         final YangInstanceIdentifier path = message.getPath();
115         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
116         getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
117     }
118
119     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
120             + "create remote instances of this actor and thus don't need it to be Serializable.")
121     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
122
123         private static final long serialVersionUID = 1L;
124
125         final AbstractShardDataTreeTransaction<?> transaction;
126         final ActorRef shardActor;
127         final DatastoreContext datastoreContext;
128         final ShardStats shardStats;
129         final TransactionType type;
130
131         ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
132                 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
133             this.transaction = requireNonNull(transaction);
134             this.shardActor = shardActor;
135             this.shardStats = shardStats;
136             this.datastoreContext = datastoreContext;
137             this.type = type;
138         }
139
140         @Override
141         public ShardTransaction create() {
142             final ShardTransaction tx;
143             switch (type) {
144                 case READ_ONLY:
145                     tx = new ShardReadTransaction(transaction, shardActor, shardStats);
146                     break;
147                 case READ_WRITE:
148                     tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
149                             shardStats);
150                     break;
151                 case WRITE_ONLY:
152                     tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
153                             shardStats);
154                     break;
155                 default:
156                     throw new IllegalArgumentException("Unhandled transaction type " + type);
157             }
158
159             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
160             return tx;
161         }
162     }
163 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.