Fix a raw Dictionary reference
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.AbstractMap.SimpleEntry;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.Promise;
30
31 /**
32  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
33  */
34 public class TransactionChainProxy implements DOMStoreTransactionChain {
35
36     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
37
38     private interface State {
39         boolean isReady();
40
41         SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures();
42
43         void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures);
44     }
45
46     private static class Allocated implements State {
47         private volatile SimpleEntry<Object, List<Future<ActorSelection>>> readyFutures;
48
49         @Override
50         public boolean isReady() {
51             return readyFutures != null;
52         }
53
54         @Override
55         public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
56             return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES;
57         }
58
59         @Override
60         public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
61             this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures);
62         }
63     }
64
65     private static abstract class AbstractDefaultState implements State {
66         @Override
67         public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
68             return EMPTY_READY_FUTURES;
69         }
70
71         @Override
72         public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
73             throw new IllegalStateException("No transaction is allocated");
74         }
75     }
76
77     private static final State IDLE_STATE = new AbstractDefaultState() {
78         @Override
79         public boolean isReady() {
80             return true;
81         }
82     };
83
84     private static final State CLOSED_STATE = new AbstractDefaultState() {
85         @Override
86         public boolean isReady() {
87             throw new TransactionChainClosedException("Transaction chain has been closed");
88         }
89     };
90
91     private static final SimpleEntry<Object, List<Future<ActorSelection>>> EMPTY_READY_FUTURES =
92             new SimpleEntry<Object, List<Future<ActorSelection>>>("",
93                     Collections.<Future<ActorSelection>>emptyList());
94
95     private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
96             AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state");
97
98     private final ActorContext actorContext;
99     private final String transactionChainId;
100     private volatile State state = IDLE_STATE;
101     private static final AtomicInteger counter = new AtomicInteger(0);
102
103     public TransactionChainProxy(ActorContext actorContext) {
104         this.actorContext = actorContext;
105         transactionChainId = actorContext.getCurrentMemberName() + "-transaction-chain-" + counter.incrementAndGet();
106     }
107
108     public String getTransactionChainId() {
109         return transactionChainId;
110     }
111
112     @Override
113     public DOMStoreReadTransaction newReadOnlyTransaction() {
114         checkReadyState();
115         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
116     }
117
118     @Override
119     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
120         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
121     }
122
123     @Override
124     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
125         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
126     }
127
128     @Override
129     public void close() {
130         state = CLOSED_STATE;
131
132         // Send a close transaction chain request to each and every shard
133         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
134     }
135
136     private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
137         checkReadyState();
138
139         ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type);
140         STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated());
141
142         return txProxy;
143     }
144
145     private void checkReadyState() {
146         Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet",
147                 state.getReadyFutures().getKey());
148     }
149
150     private class ChainedTransactionProxy extends TransactionProxy {
151
152         ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
153             super(actorContext, transactionType, transactionChainId);
154         }
155
156         @Override
157         protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
158             LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), readyFutures.size(), TransactionChainProxy.this.transactionChainId);
159             state.setReadyFutures(getIdentifier(), readyFutures);
160         }
161
162         /**
163          * This method is overridden to ensure the previous Tx's ready operations complete
164          * before we create the next shard Tx in the chain to avoid creation failures if the
165          * previous Tx's ready operations haven't completed yet.
166          */
167         @Override
168         protected Future<Object> sendCreateTransaction(final ActorSelection shard,
169                 final Object serializedCreateMessage) {
170
171             // Check if there are any previous ready Futures, otherwise let the super class handle it.
172             // The second check is done to ensure the the previous ready Futures aren't for this
173             // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can
174             // occur in this scenario:
175             //
176             //     - the TransactionProxy is created and the client does a write.
177             //
178             //     - the TransactionProxy then attempts to create the shard Tx. However it first
179             //       sends a FindPrimaryShard message to the shard manager to find the local shard
180             //       This call is done async.
181             //
182             //     - the client submits the Tx and the TransactionProxy is readied and we cache
183             //       the ready Futures here.
184             //
185             //     - then the FindPrimaryShard call completes and this method is called to create
186             //       the shard Tx. However the cached Futures were from the ready on this Tx. If we
187             //       tried to wait on them, it would cause a form of deadlock as the ready Future
188             //       would be waiting on the Tx create Future and vice versa.
189             SimpleEntry<Object, List<Future<ActorSelection>>> readyFuturesEntry = state.getReadyFutures();
190             List<Future<ActorSelection>> readyFutures = readyFuturesEntry.getValue();
191             if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) {
192                 return super.sendCreateTransaction(shard, serializedCreateMessage);
193             }
194
195             // Combine the ready Futures into 1.
196             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
197                     readyFutures, actorContext.getActorSystem().dispatcher());
198
199             // Add a callback for completion of the combined Futures.
200             final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
201             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
202                 @Override
203                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
204                     if(failure != null) {
205                         // A Ready Future failed so fail the returned Promise.
206                         createTxPromise.failure(failure);
207                     } else {
208                         // Send the CreateTx message and use the resulting Future to complete the
209                         // returned Promise.
210                         createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
211                                 serializedCreateMessage));
212                     }
213                 }
214             };
215
216             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
217
218             return createTxPromise.future();
219         }
220     }
221 }