From 42b44eaf035beeebb794277db8ede1db3caa9ab1 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 Apr 2015 17:13:30 +0200 Subject: [PATCH] CDS: Split out TransactionFutureCallback This moves the TransactionFutureCallback inner class out of TransactionProxy, so its interaction with TransactionProxy is defined using methods. Change-Id: Ib07bf91e32074b721552c8ef4952bbd40369283a Signed-off-by: Robert Varga --- .../datastore/TransactionFutureCallback.java | 264 ++++++++++++++++ .../cluster/datastore/TransactionProxy.java | 293 +++--------------- 2 files changed, 304 insertions(+), 253 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java new file mode 100644 index 0000000000..a8a93c5e7c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Implements a Future OnComplete callback for a CreateTransaction message. This class handles + * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a + * retry task after a short delay. + *

+ * The end result from a completed CreateTransaction message is a TransactionContext that is + * used to perform transaction operations. Transaction operations that occur before the + * CreateTransaction completes are cache and executed once the CreateTransaction completes, + * successfully or not. + */ +final class TransactionFutureCallback extends OnComplete { + private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class); + + /** + * Time interval in between transaction create retries. + */ + private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); + + /** + * The list of transaction operations to execute once the CreateTransaction completes. + */ + @GuardedBy("txOperationsOnComplete") + private final List txOperationsOnComplete = Lists.newArrayList(); + private final TransactionProxy proxy; + private final String shardName; + + /** + * The TransactionContext resulting from the CreateTransaction reply. + */ + private volatile TransactionContext transactionContext; + + /** + * The target primary shard. + */ + private volatile ActorSelection primaryShard; + private volatile int createTxTries; + + TransactionFutureCallback(final TransactionProxy proxy, final String shardName) { + this.proxy = Preconditions.checkNotNull(proxy); + this.shardName = shardName; + createTxTries = (int) (proxy.getActorContext().getDatastoreContext(). + getShardLeaderElectionTimeout().duration().toMillis() / + CREATE_TX_TRY_INTERVAL.toMillis()); + } + + String getShardName() { + return shardName; + } + + TransactionContext getTransactionContext() { + return transactionContext; + } + + private TransactionType getTransactionType() { + return proxy.getTransactionType(); + } + + private TransactionIdentifier getIdentifier() { + return proxy.getIdentifier(); + } + + private ActorContext getActorContext() { + return proxy.getActorContext(); + } + + private Semaphore getOperationLimiter() { + return proxy.getOperationLimiter(); + } + + /** + * Sets the target primary shard and initiates a CreateTransaction try. + */ + void setPrimaryShard(ActorSelection primaryShard) { + this.primaryShard = primaryShard; + + if (getTransactionType() == TransactionType.WRITE_ONLY && + getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", + getIdentifier(), primaryShard); + + // For write-only Tx's we prepare the transaction modifications directly on the shard actor + // to avoid the overhead of creating a separate transaction actor. + // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. + executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard, + this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); + } else { + tryCreateTransaction(); + } + } + + /** + * Adds a TransactionOperation to be executed after the CreateTransaction completes. + */ + private void addTxOperationOnComplete(TransactionOperation operation) { + boolean invokeOperation = true; + synchronized(txOperationsOnComplete) { + if(transactionContext == null) { + LOG.debug("Tx {} Adding operation on complete", getIdentifier()); + + invokeOperation = false; + txOperationsOnComplete.add(operation); + } + } + + if(invokeOperation) { + operation.invoke(transactionContext); + } + } + + void enqueueTransactionOperation(final TransactionOperation op) { + + if (transactionContext != null) { + op.invoke(transactionContext); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + addTxOperationOnComplete(op); + } + } + + /** + * Performs a CreateTransaction try async. + */ + private void tryCreateTransaction() { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); + } + + Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), + getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable(); + + Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage); + + createTxFuture.onComplete(this, getActorContext().getClientDispatcher()); + } + + @Override + public void onComplete(Throwable failure, Object response) { + if(failure instanceof NoShardLeaderException) { + // There's no leader for the shard yet - schedule and try again, unless we're out + // of retries. Note: createTxTries is volatile as it may be written by different + // threads however not concurrently, therefore decrementing it non-atomically here + // is ok. + if(--createTxTries > 0) { + LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", + getIdentifier(), shardName); + + getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, + new Runnable() { + @Override + public void run() { + tryCreateTransaction(); + } + }, getActorContext().getClientDispatcher()); + return; + } + } + + createTransactionContext(failure, response); + } + + void createTransactionContext(Throwable failure, Object response) { + // Mainly checking for state violation here to perform a volatile read of "initialized" to + // ensure updates to operationLimter et al are visible to this thread (ie we're doing + // "piggy-back" synchronization here). + proxy.ensureInitializied(); + + // Create the TransactionContext from the response or failure. Store the new + // TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; + if(failure != null) { + LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); + + localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter()); + } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); + } else { + IllegalArgumentException exception = new IllegalArgumentException(String.format( + "Invalid reply type %s for CreateTransaction", response.getClass())); + + localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter()); + } + + executeTxOperatonsOnComplete(localTransactionContext); + } + + private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { + while(true) { + // Access to txOperationsOnComplete and transactionContext must be protected and atomic + // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing + // issues and ensure no TransactionOperation is missed and that they are processed + // in the order they occurred. + + // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy + // in case a TransactionOperation results in another transaction operation being + // queued (eg a put operation from a client read Future callback that is notified + // synchronously). + Collection operationsBatch = null; + synchronized(txOperationsOnComplete) { + if(txOperationsOnComplete.isEmpty()) { + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; + break; + } + + operationsBatch = new ArrayList<>(txOperationsOnComplete); + txOperationsOnComplete.clear(); + } + + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. + // A slight down-side is that we need to re-acquire the lock below but this should + // be negligible. + for(TransactionOperation oper: operationsBatch) { + oper.invoke(localTransactionContext); + } + } + } + + private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { + LOG.debug("Tx {} Received {}", getIdentifier(), reply); + + return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()), + reply.getTransactionPath(), reply.getVersion()); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 388dd9f4bd..a0987cd5d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -20,7 +20,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,12 +29,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; -import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -50,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard @@ -101,12 +95,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction data) { @@ -444,6 +436,14 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); + final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); @@ -474,7 +474,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction - * The end result from a completed CreateTransaction message is a TransactionContext that is - * used to perform transaction operations. Transaction operations that occur before the - * CreateTransaction completes are cache and executed once the CreateTransaction completes, - * successfully or not. - */ - private class TransactionFutureCallback extends OnComplete { - - /** - * The list of transaction operations to execute once the CreateTransaction completes. - */ - @GuardedBy("txOperationsOnComplete") - private final List txOperationsOnComplete = Lists.newArrayList(); - - /** - * The TransactionContext resulting from the CreateTransaction reply. - */ - private volatile TransactionContext transactionContext; - - /** - * The target primary shard. - */ - private volatile ActorSelection primaryShard; - - private volatile int createTxTries = (int) (actorContext.getDatastoreContext(). - getShardLeaderElectionTimeout().duration().toMillis() / - CREATE_TX_TRY_INTERVAL.toMillis()); - - private final String shardName; - - TransactionFutureCallback(String shardName) { - this.shardName = shardName; - } - - String getShardName() { - return shardName; - } - - TransactionContext getTransactionContext() { - return transactionContext; - } - - - /** - * Sets the target primary shard and initiates a CreateTransaction try. - */ - void setPrimaryShard(ActorSelection primaryShard) { - this.primaryShard = primaryShard; - - if(transactionType == TransactionType.WRITE_ONLY && - actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { - LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", - getIdentifier(), primaryShard); - - // For write-only Tx's we prepare the transaction modifications directly on the shard actor - // to avoid the overhead of creating a separate transaction actor. - // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. - executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard, - this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); - } else { - tryCreateTransaction(); - } - } - - /** - * Adds a TransactionOperation to be executed after the CreateTransaction completes. - */ - void addTxOperationOnComplete(TransactionOperation operation) { - boolean invokeOperation = true; - synchronized(txOperationsOnComplete) { - if(transactionContext == null) { - LOG.debug("Tx {} Adding operation on complete", getIdentifier()); - - invokeOperation = false; - txOperationsOnComplete.add(operation); - } - } - - if(invokeOperation) { - operation.invoke(transactionContext); - } - } - - void enqueueTransactionOperation(final TransactionOperation op) { - - if (transactionContext != null) { - op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - addTxOperationOnComplete(op); - } - } - - /** - * Performs a CreateTransaction try async. - */ - private void tryCreateTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); - } - - Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), - TransactionProxy.this.transactionType.ordinal(), - getTransactionChainId()).toSerializable(); - - Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); - - createTxFuture.onComplete(this, actorContext.getClientDispatcher()); - } - - @Override - public void onComplete(Throwable failure, Object response) { - if(failure instanceof NoShardLeaderException) { - // There's no leader for the shard yet - schedule and try again, unless we're out - // of retries. Note: createTxTries is volatile as it may be written by different - // threads however not concurrently, therefore decrementing it non-atomically here - // is ok. - if(--createTxTries > 0) { - LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", - getIdentifier(), shardName); - - actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, - new Runnable() { - @Override - public void run() { - tryCreateTransaction(); - } - }, actorContext.getClientDispatcher()); - return; - } - } - - createTransactionContext(failure, response); - } - - private void createTransactionContext(Throwable failure, Object response) { - // Mainly checking for state violation here to perform a volatile read of "initialized" to - // ensure updates to operationLimter et al are visible to this thread (ie we're doing - // "piggy-back" synchronization here). - Preconditions.checkState(initialized, "Tx was not propertly initialized."); - - // Create the TransactionContext from the response or failure. Store the new - // TransactionContext locally until we've completed invoking the - // TransactionOperations. This avoids thread timing issues which could cause - // out-of-order TransactionOperations. Eg, on a modification operation, if the - // TransactionContext is non-null, then we directly call the TransactionContext. - // However, at the same time, the code may be executing the cached - // TransactionOperations. So to avoid thus timing, we don't publish the - // TransactionContext until after we've executed all cached TransactionOperations. - TransactionContext localTransactionContext; - if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - - localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter); - } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { - localTransactionContext = createValidTransactionContext( - CreateTransactionReply.fromSerializable(response)); - } else { - IllegalArgumentException exception = new IllegalArgumentException(String.format( - "Invalid reply type %s for CreateTransaction", response.getClass())); - - localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter); - } + TransactionContext createValidTransactionContext(ActorSelection transactionActor, + String transactionPath, short remoteTransactionVersion) { - executeTxOperatonsOnComplete(localTransactionContext); - } + if (transactionType == TransactionType.READ_ONLY) { + // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference + // to close the remote Tx's when this instance is no longer in use and is garbage + // collected. - private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { - while(true) { - // Access to txOperationsOnComplete and transactionContext must be protected and atomic - // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing - // issues and ensure no TransactionOperation is missed and that they are processed - // in the order they occurred. - - // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy - // in case a TransactionOperation results in another transaction operation being - // queued (eg a put operation from a client read Future callback that is notified - // synchronously). - Collection operationsBatch = null; - synchronized(txOperationsOnComplete) { - if(txOperationsOnComplete.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; - break; - } - - operationsBatch = new ArrayList<>(txOperationsOnComplete); - txOperationsOnComplete.clear(); - } + if(remoteTransactionActorsMB == null) { + remoteTransactionActors = Lists.newArrayList(); + remoteTransactionActorsMB = new AtomicBoolean(); - // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. - // A slight down-side is that we need to re-acquire the lock below but this should - // be negligible. - for(TransactionOperation oper: operationsBatch) { - oper.invoke(localTransactionContext); - } + TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); } - } - private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { - LOG.debug("Tx {} Received {}", getIdentifier(), reply); + // Add the actor to the remoteTransactionActors list for access by the + // cleanup PhantonReference. + remoteTransactionActors.add(transactionActor); - return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()), - reply.getTransactionPath(), reply.getVersion()); + // Write to the memory barrier volatile to publish the above update to the + // remoteTransactionActors list for thread visibility. + remoteTransactionActorsMB.set(true); } - private TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); + // TxActor is always created where the leader of the shard is. + // Check if TxActor is created in the same node + boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); - } - - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); - - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); - } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); - } + if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { + return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), + transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, + operationCompleter); + } else { + return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } } -- 2.36.6