92de88e1126c19c38718f0f7c5c8cd51ad8430c0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.AbstractMap.SimpleEntry;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
19 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
25 import scala.concurrent.Future;
26 import scala.concurrent.Promise;
27
28 /**
29  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
30  */
31 public class TransactionChainProxy implements DOMStoreTransactionChain {
32     private interface State {
33         boolean isReady();
34
35         SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures();
36
37         void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures);
38     }
39
40     private static class Allocated implements State {
41         private volatile SimpleEntry<Object, List<Future<ActorSelection>>> readyFutures;
42
43         @Override
44         public boolean isReady() {
45             return readyFutures != null;
46         }
47
48         @Override
49         public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
50             return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES;
51         }
52
53         @Override
54         public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
55             this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures);
56         }
57     }
58
59     private static abstract class AbstractDefaultState implements State {
60         @Override
61         public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
62             return EMPTY_READY_FUTURES;
63         }
64
65         @Override
66         public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
67             throw new IllegalStateException("No transaction is allocated");
68         }
69     }
70
71     private static final State IDLE_STATE = new AbstractDefaultState() {
72         @Override
73         public boolean isReady() {
74             return true;
75         }
76     };
77
78     private static final State CLOSED_STATE = new AbstractDefaultState() {
79         @Override
80         public boolean isReady() {
81             throw new TransactionChainClosedException("Transaction chain has been closed");
82         }
83     };
84
85     private static final SimpleEntry<Object, List<Future<ActorSelection>>> EMPTY_READY_FUTURES =
86             new SimpleEntry<Object, List<Future<ActorSelection>>>("",
87                     Collections.<Future<ActorSelection>>emptyList());
88
89     private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
90             AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state");
91
92     private final ActorContext actorContext;
93     private final String transactionChainId;
94     private volatile State state = IDLE_STATE;
95
96     public TransactionChainProxy(ActorContext actorContext) {
97         this.actorContext = actorContext;
98         transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
99     }
100
101     @Override
102     public DOMStoreReadTransaction newReadOnlyTransaction() {
103         checkReadyState();
104         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
105     }
106
107     @Override
108     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
109         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
110     }
111
112     @Override
113     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
114         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
115     }
116
117     @Override
118     public void close() {
119         state = CLOSED_STATE;
120
121         // Send a close transaction chain request to each and every shard
122         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
123     }
124
125     private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
126         checkReadyState();
127
128         ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type);
129         STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated());
130
131         return txProxy;
132     }
133
134     private void checkReadyState() {
135         Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet",
136                 state.getReadyFutures().getKey());
137     }
138
139     private class ChainedTransactionProxy extends TransactionProxy {
140
141         ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
142             super(actorContext, transactionType, transactionChainId);
143         }
144
145         @Override
146         protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
147             state.setReadyFutures(getIdentifier(), readyFutures);
148         }
149
150         /**
151          * This method is overridden to ensure the previous Tx's ready operations complete
152          * before we create the next shard Tx in the chain to avoid creation failures if the
153          * previous Tx's ready operations haven't completed yet.
154          */
155         @Override
156         protected Future<Object> sendCreateTransaction(final ActorSelection shard,
157                 final Object serializedCreateMessage) {
158
159             // Check if there are any previous ready Futures, otherwise let the super class handle it.
160             // The second check is done to ensure the the previous ready Futures aren't for this
161             // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can
162             // occur in this scenario:
163             //
164             //     - the TransactionProxy is created and the client does a write.
165             //
166             //     - the TransactionProxy then attempts to create the shard Tx. However it first
167             //       sends a FindPrimaryShard message to the shard manager to find the local shard
168             //       This call is done async.
169             //
170             //     - the client submits the Tx and the TransactionProxy is readied and we cache
171             //       the ready Futures here.
172             //
173             //     - then the FindPrimaryShard call completes and this method is called to create
174             //       the shard Tx. However the cached Futures were from the ready on this Tx. If we
175             //       tried to wait on them, it would cause a form of deadlock as the ready Future
176             //       would be waiting on the Tx create Future and vice versa.
177             SimpleEntry<Object, List<Future<ActorSelection>>> readyFuturesEntry = state.getReadyFutures();
178             List<Future<ActorSelection>> readyFutures = readyFuturesEntry.getValue();
179             if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) {
180                 return super.sendCreateTransaction(shard, serializedCreateMessage);
181             }
182
183             // Combine the ready Futures into 1.
184             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
185                     readyFutures, actorContext.getActorSystem().dispatcher());
186
187             // Add a callback for completion of the combined Futures.
188             final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
189             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
190                 @Override
191                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
192                     if(failure != null) {
193                         // A Ready Future failed so fail the returned Promise.
194                         createTxPromise.failure(failure);
195                     } else {
196                         // Send the CreateTx message and use the resulting Future to complete the
197                         // returned Promise.
198                         createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
199                                 serializedCreateMessage));
200                     }
201                 }
202             };
203
204             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
205
206             return createTxPromise.future();
207         }
208     }
209 }