2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.AbstractMap.SimpleEntry;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.Promise;
32 * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
34 public class TransactionChainProxy implements DOMStoreTransactionChain {
36 private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
38 private interface State {
41 SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures();
43 void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures);
46 private static class Allocated implements State {
47 private volatile SimpleEntry<Object, List<Future<ActorSelection>>> readyFutures;
50 public boolean isReady() {
51 return readyFutures != null;
55 public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
56 return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES;
60 public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
61 this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures);
65 private static abstract class AbstractDefaultState implements State {
67 public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
68 return EMPTY_READY_FUTURES;
72 public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
73 throw new IllegalStateException("No transaction is allocated");
77 private static final State IDLE_STATE = new AbstractDefaultState() {
79 public boolean isReady() {
84 private static final State CLOSED_STATE = new AbstractDefaultState() {
86 public boolean isReady() {
87 throw new TransactionChainClosedException("Transaction chain has been closed");
91 private static final SimpleEntry<Object, List<Future<ActorSelection>>> EMPTY_READY_FUTURES =
92 new SimpleEntry<Object, List<Future<ActorSelection>>>("",
93 Collections.<Future<ActorSelection>>emptyList());
95 private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
96 AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state");
98 private final ActorContext actorContext;
99 private final String transactionChainId;
100 private volatile State state = IDLE_STATE;
101 private static final AtomicInteger counter = new AtomicInteger(0);
103 public TransactionChainProxy(ActorContext actorContext) {
104 this.actorContext = actorContext;
105 transactionChainId = actorContext.getCurrentMemberName() + "-transaction-chain-" + counter.incrementAndGet();
108 public String getTransactionChainId() {
109 return transactionChainId;
113 public DOMStoreReadTransaction newReadOnlyTransaction() {
115 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
119 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
120 return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
124 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
125 return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
129 public void close() {
130 state = CLOSED_STATE;
132 // Send a close transaction chain request to each and every shard
133 actorContext.broadcast(new CloseTransactionChain(transactionChainId));
136 private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
139 ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type);
140 STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated());
145 private void checkReadyState() {
146 Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet",
147 state.getReadyFutures().getKey());
150 private class ChainedTransactionProxy extends TransactionProxy {
152 ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
153 super(actorContext, transactionType, transactionChainId);
157 protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
158 LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), readyFutures.size(), TransactionChainProxy.this.transactionChainId);
159 state.setReadyFutures(getIdentifier(), readyFutures);
163 * This method is overridden to ensure the previous Tx's ready operations complete
164 * before we create the next shard Tx in the chain to avoid creation failures if the
165 * previous Tx's ready operations haven't completed yet.
168 protected Future<Object> sendCreateTransaction(final ActorSelection shard,
169 final Object serializedCreateMessage) {
171 // Check if there are any previous ready Futures, otherwise let the super class handle it.
172 // The second check is done to ensure the the previous ready Futures aren't for this
173 // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can
174 // occur in this scenario:
176 // - the TransactionProxy is created and the client does a write.
178 // - the TransactionProxy then attempts to create the shard Tx. However it first
179 // sends a FindPrimaryShard message to the shard manager to find the local shard
180 // This call is done async.
182 // - the client submits the Tx and the TransactionProxy is readied and we cache
183 // the ready Futures here.
185 // - then the FindPrimaryShard call completes and this method is called to create
186 // the shard Tx. However the cached Futures were from the ready on this Tx. If we
187 // tried to wait on them, it would cause a form of deadlock as the ready Future
188 // would be waiting on the Tx create Future and vice versa.
189 SimpleEntry<Object, List<Future<ActorSelection>>> readyFuturesEntry = state.getReadyFutures();
190 List<Future<ActorSelection>> readyFutures = readyFuturesEntry.getValue();
191 if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) {
192 return super.sendCreateTransaction(shard, serializedCreateMessage);
195 // Combine the ready Futures into 1.
196 Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
197 readyFutures, actorContext.getActorSystem().dispatcher());
199 // Add a callback for completion of the combined Futures.
200 final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
201 OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
203 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
204 if(failure != null) {
205 // A Ready Future failed so fail the returned Promise.
206 createTxPromise.failure(failure);
208 // Send the CreateTx message and use the resulting Future to complete the
210 createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
211 serializedCreateMessage));
216 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
218 return createTxPromise.future();