2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.concurrent.Promise;
30 * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
32 public class TransactionChainProxy implements DOMStoreTransactionChain {
34 private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
36 private interface State {
39 List<Future<ActorSelection>> getPreviousReadyFutures();
42 private static class Allocated implements State {
43 private final ChainedTransactionProxy transaction;
45 Allocated(ChainedTransactionProxy transaction) {
46 this.transaction = transaction;
50 public boolean isReady() {
51 return transaction.isReady();
55 public List<Future<ActorSelection>> getPreviousReadyFutures() {
56 return transaction.getReadyFutures();
60 private static abstract class AbstractDefaultState implements State {
62 public List<Future<ActorSelection>> getPreviousReadyFutures() {
63 return Collections.emptyList();
67 private static final State IDLE_STATE = new AbstractDefaultState() {
69 public boolean isReady() {
74 private static final State CLOSED_STATE = new AbstractDefaultState() {
76 public boolean isReady() {
77 throw new TransactionChainClosedException("Transaction chain has been closed");
81 private static final AtomicInteger counter = new AtomicInteger(0);
83 private final ActorContext actorContext;
84 private final String transactionChainId;
85 private volatile State currentState = IDLE_STATE;
87 public TransactionChainProxy(ActorContext actorContext) {
88 this.actorContext = actorContext;
89 transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet();
92 public String getTransactionChainId() {
93 return transactionChainId;
97 public DOMStoreReadTransaction newReadOnlyTransaction() {
98 State localState = currentState;
99 checkReadyState(localState);
101 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
102 transactionChainId, localState.getPreviousReadyFutures());
106 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
107 actorContext.acquireTxCreationPermit();
108 return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
112 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
113 actorContext.acquireTxCreationPermit();
114 return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
118 public void close() {
119 currentState = CLOSED_STATE;
121 // Send a close transaction chain request to each and every shard
122 actorContext.broadcast(new CloseTransactionChain(transactionChainId));
125 private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
126 State localState = currentState;
128 checkReadyState(localState);
130 // Pass the ready Futures from the previous Tx.
131 ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type,
132 transactionChainId, localState.getPreviousReadyFutures());
134 currentState = new Allocated(txProxy);
139 private void checkReadyState(State state) {
140 Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
143 private static class ChainedTransactionProxy extends TransactionProxy {
146 * Stores the ready Futures from the previous Tx in the chain.
148 private final List<Future<ActorSelection>> previousReadyFutures;
151 * Stores the ready Futures from this transaction when it is readied.
153 private volatile List<Future<ActorSelection>> readyFutures;
155 private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
156 String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
157 super(actorContext, transactionType, transactionChainId);
158 this.previousReadyFutures = previousReadyFutures;
161 List<Future<ActorSelection>> getReadyFutures() {
166 return readyFutures != null;
170 protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
171 LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
172 readyFutures.size(), getTransactionChainId());
173 this.readyFutures = readyFutures;
177 * This method is overridden to ensure the previous Tx's ready operations complete
178 * before we create the next shard Tx in the chain to avoid creation failures if the
179 * previous Tx's ready operations haven't completed yet.
182 protected Future<Object> sendCreateTransaction(final ActorSelection shard,
183 final Object serializedCreateMessage) {
185 // Check if there are any previous ready Futures, otherwise let the super class handle it.
186 if(previousReadyFutures.isEmpty()) {
187 return super.sendCreateTransaction(shard, serializedCreateMessage);
190 // Combine the ready Futures into 1.
191 Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
192 previousReadyFutures, getActorContext().getActorSystem().dispatcher());
194 // Add a callback for completion of the combined Futures.
195 final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
196 OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
198 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
199 if(failure != null) {
200 // A Ready Future failed so fail the returned Promise.
201 createTxPromise.failure(failure);
203 LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
204 getIdentifier(), getTransactionChainId());
206 // Send the CreateTx message and use the resulting Future to complete the
208 createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
209 serializedCreateMessage));
214 combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
216 return createTxPromise.future();