BUG-5280: switch transactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
20 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29
30 /**
31  * The ShardTransaction Actor represents a remote transaction
32  * <p>
33  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
34  * </p>
35  * <p>
36  * Handles Messages <br/>
37  * ---------------- <br/>
38  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
39  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
40  * </p>
41  */
42 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
43     private final ActorRef shardActor;
44     private final ShardStats shardStats;
45     private final String transactionID;
46
47     protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID) {
48         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
49         this.shardActor = shardActor;
50         this.shardStats = shardStats;
51         this.transactionID = Preconditions.checkNotNull(transactionID);
52     }
53
54     public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
55             DatastoreContext datastoreContext, ShardStats shardStats) {
56         return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
57     }
58
59     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
60
61     protected ActorRef getShardActor() {
62         return shardActor;
63     }
64
65     protected String getTransactionID() {
66         return transactionID;
67     }
68
69     @Override
70     public void handleReceive(Object message) {
71         if (CloseTransaction.isSerializedType(message)) {
72             closeTransaction(true);
73         } else if (message instanceof ReceiveTimeout) {
74             LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionID);
75             closeTransaction(false);
76         } else {
77             unknownMessage(message);
78         }
79     }
80
81     protected boolean returnCloseTransactionReply() {
82         return true;
83     }
84
85     private void closeTransaction(boolean sendReply) {
86         getDOMStoreTransaction().abort();
87
88         if(sendReply && returnCloseTransactionReply()) {
89             getSender().tell(new CloseTransactionReply(), getSelf());
90         }
91
92         getSelf().tell(PoisonPill.getInstance(), getSelf());
93     }
94
95     private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
96         final boolean ret = transaction.isClosed();
97         if (ret) {
98             shardStats.incrementFailedReadTransactionsCount();
99             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
100         }
101         return ret;
102     }
103
104     protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
105         if (checkClosed(transaction)) {
106             return;
107         }
108
109         final YangInstanceIdentifier path = message.getPath();
110         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
111         ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
112         sender().tell(readDataReply.toSerializable(), self());
113     }
114
115     protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
116         if (checkClosed(transaction)) {
117             return;
118         }
119
120         final YangInstanceIdentifier path = message.getPath();
121         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
122         getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
123     }
124
125     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
126
127         private static final long serialVersionUID = 1L;
128
129         final AbstractShardDataTreeTransaction<?> transaction;
130         final ActorRef shardActor;
131         final DatastoreContext datastoreContext;
132         final ShardStats shardStats;
133         final TransactionType type;
134
135         ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
136                 DatastoreContext datastoreContext, ShardStats shardStats) {
137             this.transaction = Preconditions.checkNotNull(transaction);
138             this.shardActor = shardActor;
139             this.shardStats = shardStats;
140             this.datastoreContext = datastoreContext;
141             this.type = type;
142         }
143
144         @Override
145         public ShardTransaction create() throws Exception {
146             final ShardTransaction tx;
147             switch (type) {
148             case READ_ONLY:
149                 tx = new ShardReadTransaction(transaction, shardActor, shardStats);
150                 break;
151             case READ_WRITE:
152                 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor, shardStats);
153                 break;
154             case WRITE_ONLY:
155                 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor, shardStats);
156                 break;
157             default:
158                 throw new IllegalArgumentException("Unhandled transaction type " + type);
159             }
160
161             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
162             return tx;
163         }
164     }
165 }