2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.AbstractMap.SimpleEntry;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
19 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
25 import scala.concurrent.Future;
26 import scala.concurrent.Promise;
29 * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
31 public class TransactionChainProxy implements DOMStoreTransactionChain {
32 private interface State {
35 SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures();
37 void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures);
40 private static class Allocated implements State {
41 private volatile SimpleEntry<Object, List<Future<ActorSelection>>> readyFutures;
44 public boolean isReady() {
45 return readyFutures != null;
49 public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
50 return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES;
54 public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
55 this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures);
59 private static abstract class AbstractDefaultState implements State {
61 public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
62 return EMPTY_READY_FUTURES;
66 public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
67 throw new IllegalStateException("No transaction is allocated");
71 private static final State IDLE_STATE = new AbstractDefaultState() {
73 public boolean isReady() {
78 private static final State CLOSED_STATE = new AbstractDefaultState() {
80 public boolean isReady() {
81 throw new TransactionChainClosedException("Transaction chain has been closed");
85 private static final SimpleEntry<Object, List<Future<ActorSelection>>> EMPTY_READY_FUTURES =
86 new SimpleEntry<Object, List<Future<ActorSelection>>>("",
87 Collections.<Future<ActorSelection>>emptyList());
89 private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
90 AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state");
92 private final ActorContext actorContext;
93 private final String transactionChainId;
94 private volatile State state = IDLE_STATE;
96 public TransactionChainProxy(ActorContext actorContext) {
97 this.actorContext = actorContext;
98 transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
102 public DOMStoreReadTransaction newReadOnlyTransaction() {
104 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
108 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
109 return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
113 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
114 return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
118 public void close() {
119 state = CLOSED_STATE;
121 // Send a close transaction chain request to each and every shard
122 actorContext.broadcast(new CloseTransactionChain(transactionChainId));
125 private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
128 ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type);
129 STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated());
134 private void checkReadyState() {
135 Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet",
136 state.getReadyFutures().getKey());
139 private class ChainedTransactionProxy extends TransactionProxy {
141 ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
142 super(actorContext, transactionType, transactionChainId);
146 protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
147 state.setReadyFutures(getIdentifier(), readyFutures);
151 * This method is overridden to ensure the previous Tx's ready operations complete
152 * before we create the next shard Tx in the chain to avoid creation failures if the
153 * previous Tx's ready operations haven't completed yet.
156 protected Future<Object> sendCreateTransaction(final ActorSelection shard,
157 final Object serializedCreateMessage) {
159 // Check if there are any previous ready Futures, otherwise let the super class handle it.
160 // The second check is done to ensure the the previous ready Futures aren't for this
161 // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can
162 // occur in this scenario:
164 // - the TransactionProxy is created and the client does a write.
166 // - the TransactionProxy then attempts to create the shard Tx. However it first
167 // sends a FindPrimaryShard message to the shard manager to find the local shard
168 // This call is done async.
170 // - the client submits the Tx and the TransactionProxy is readied and we cache
171 // the ready Futures here.
173 // - then the FindPrimaryShard call completes and this method is called to create
174 // the shard Tx. However the cached Futures were from the ready on this Tx. If we
175 // tried to wait on them, it would cause a form of deadlock as the ready Future
176 // would be waiting on the Tx create Future and vice versa.
177 SimpleEntry<Object, List<Future<ActorSelection>>> readyFuturesEntry = state.getReadyFutures();
178 List<Future<ActorSelection>> readyFutures = readyFuturesEntry.getValue();
179 if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) {
180 return super.sendCreateTransaction(shard, serializedCreateMessage);
183 // Combine the ready Futures into 1.
184 Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
185 readyFutures, actorContext.getActorSystem().dispatcher());
187 // Add a callback for completion of the combined Futures.
188 final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
189 OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
191 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
192 if(failure != null) {
193 // A Ready Future failed so fail the returned Promise.
194 createTxPromise.failure(failure);
196 // Send the CreateTx message and use the resulting Future to complete the
198 createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
199 serializedCreateMessage));
204 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
206 return createTxPromise.future();