87959efe8ae2def5684e253f2e0840c7177db838
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.concurrent.Promise;
28
29 /**
30  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
31  */
32 public class TransactionChainProxy implements DOMStoreTransactionChain {
33
34     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
35
36     private interface State {
37         boolean isReady();
38
39         List<Future<ActorSelection>> getPreviousReadyFutures();
40     }
41
42     private static class Allocated implements State {
43         private final ChainedTransactionProxy transaction;
44
45         Allocated(ChainedTransactionProxy transaction) {
46             this.transaction = transaction;
47         }
48
49         @Override
50         public boolean isReady() {
51             return transaction.isReady();
52         }
53
54         @Override
55         public List<Future<ActorSelection>> getPreviousReadyFutures() {
56             return transaction.getReadyFutures();
57         }
58     }
59
60     private static abstract class AbstractDefaultState implements State {
61         @Override
62         public List<Future<ActorSelection>> getPreviousReadyFutures() {
63             return Collections.emptyList();
64         }
65     }
66
67     private static final State IDLE_STATE = new AbstractDefaultState() {
68         @Override
69         public boolean isReady() {
70             return true;
71         }
72     };
73
74     private static final State CLOSED_STATE = new AbstractDefaultState() {
75         @Override
76         public boolean isReady() {
77             throw new TransactionChainClosedException("Transaction chain has been closed");
78         }
79     };
80
81     private static final AtomicInteger counter = new AtomicInteger(0);
82
83     private final ActorContext actorContext;
84     private final String transactionChainId;
85     private volatile State currentState = IDLE_STATE;
86
87     public TransactionChainProxy(ActorContext actorContext) {
88         this.actorContext = actorContext;
89         transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet();
90     }
91
92     public String getTransactionChainId() {
93         return transactionChainId;
94     }
95
96     @Override
97     public DOMStoreReadTransaction newReadOnlyTransaction() {
98         State localState = currentState;
99         checkReadyState(localState);
100
101         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
102                 transactionChainId, localState.getPreviousReadyFutures());
103     }
104
105     @Override
106     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
107         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
108     }
109
110     @Override
111     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
112         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
113     }
114
115     @Override
116     public void close() {
117         currentState = CLOSED_STATE;
118
119         // Send a close transaction chain request to each and every shard
120         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
121     }
122
123     private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
124         State localState = currentState;
125
126         checkReadyState(localState);
127
128         // Pass the ready Futures from the previous Tx.
129         ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type,
130                 transactionChainId, localState.getPreviousReadyFutures());
131
132         currentState = new Allocated(txProxy);
133
134         return txProxy;
135     }
136
137     private void checkReadyState(State state) {
138         Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
139     }
140
141     private static class ChainedTransactionProxy extends TransactionProxy {
142
143         /**
144          * Stores the ready Futures from the previous Tx in the chain.
145          */
146         private final List<Future<ActorSelection>> previousReadyFutures;
147
148         /**
149          * Stores the ready Futures from this transaction when it is readied.
150          */
151         private volatile List<Future<ActorSelection>> readyFutures;
152
153         private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
154                 String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
155             super(actorContext, transactionType, transactionChainId);
156             this.previousReadyFutures = previousReadyFutures;
157         }
158
159         List<Future<ActorSelection>> getReadyFutures() {
160             return readyFutures;
161         }
162
163         boolean isReady() {
164             return readyFutures != null;
165         }
166
167         @Override
168         protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
169             LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
170                     readyFutures.size(), getTransactionChainId());
171             this.readyFutures = readyFutures;
172         }
173
174         /**
175          * This method is overridden to ensure the previous Tx's ready operations complete
176          * before we create the next shard Tx in the chain to avoid creation failures if the
177          * previous Tx's ready operations haven't completed yet.
178          */
179         @Override
180         protected Future<Object> sendCreateTransaction(final ActorSelection shard,
181                 final Object serializedCreateMessage) {
182
183             // Check if there are any previous ready Futures, otherwise let the super class handle it.
184             if(previousReadyFutures.isEmpty()) {
185                 return super.sendCreateTransaction(shard, serializedCreateMessage);
186             }
187
188             // Combine the ready Futures into 1.
189             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
190                     previousReadyFutures, getActorContext().getActorSystem().dispatcher());
191
192             // Add a callback for completion of the combined Futures.
193             final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
194             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
195                 @Override
196                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
197                     if(failure != null) {
198                         // A Ready Future failed so fail the returned Promise.
199                         createTxPromise.failure(failure);
200                     } else {
201                         LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
202                                 getIdentifier(), getTransactionChainId());
203
204                         // Send the CreateTx message and use the resulting Future to complete the
205                         // returned Promise.
206                         createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
207                                 serializedCreateMessage));
208                     }
209                 }
210             };
211
212             combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
213
214             return createTxPromise.future();
215         }
216     }
217 }