*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import java.util.Collection;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
*/
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
-
- protected static final AtomicLong TX_COUNTER = new AtomicLong();
+ @SuppressWarnings("rawtypes")
+ private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
- private final ActorContext actorContext;
+ private final @NonNull LocalHistoryIdentifier historyId;
+ private final @NonNull ActorUtils actorUtils;
+
+ // Used via TX_COUNTER_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx;
+
+ protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
+ this.actorUtils = requireNonNull(actorUtils);
+ this.historyId = requireNonNull(historyId);
+ }
- protected AbstractTransactionContextFactory(final ActorContext actorContext) {
- this.actorContext = Preconditions.checkNotNull(actorContext);
+ final ActorUtils getActorUtils() {
+ return actorUtils;
}
- final ActorContext getActorContext() {
- return actorContext;
+ final LocalHistoryIdentifier getHistoryId() {
+ return historyId;
}
- private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
+ final String shardName) {
final LocalTransactionFactory local = knownLocal.get(shardName);
if (local != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
- parent.getIdentifier(), shardName, local);
+ LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
+ shardName, local);
+
+ try {
+ return createLocalTransactionContext(local, parent);
+ } catch (Exception e) {
+ return new NoOpTransactionContext(e, parent.getIdentifier());
}
- return createLocalTransactionContext(local, parent);
}
return null;
}
- private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
- primaryShardInfo.getPrimaryShardActor(), shardName);
+ private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
+ final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
+ LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
+ parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
+
+ updateShardInfo(shardName, primaryShardInfo);
+
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+ try {
+ if (localContext != null) {
+ LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
+ parent.getIdentifier());
+ return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
+ localContext);
+ }
+
+ LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
+ parent.getIdentifier());
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
+ remote.setPrimaryShard(primaryShardInfo);
+ return transactionContextWrapper;
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
}
+ }
+
+ private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
+ LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
- TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- if(localContext != null) {
- transactionContextWrapper.executePriorTransactionOperations(localContext);
- } else {
- RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
- parent, shardName);
- remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+ try {
+ if (localContext != null) {
+ transactionContextWrapper.executePriorTransactionOperations(localContext);
+ } else {
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
+ remote.setPrimaryShard(primaryShardInfo);
+ }
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
}
}
- private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
- transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier()));
+ try {
+ transactionContextWrapper.executePriorTransactionOperations(
+ new NoOpTransactionContext(failure, parent.getIdentifier()));
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
+ }
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
- final TransactionContextWrapper transactionContextWrapper =
- new TransactionContextWrapper(parent.getIdentifier(), actorContext);
-
- Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
- if(findPrimaryFuture.isCompleted()) {
- Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
- if(maybe.isSuccess()) {
- onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
- } else {
- onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
+ final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final String shardName) {
+ final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
+ parent.getIdentifier(), actorUtils, shardName);
+ final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
+ if (findPrimaryFuture.isCompleted()) {
+ final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
+ if (maybe.isSuccess()) {
+ return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
}
+
+ onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
} else {
- findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
- @Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
- if (failure == null) {
- onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
- } else {
- onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
- }
+ findPrimaryFuture.onComplete(result -> {
+ if (result.isSuccess()) {
+ onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
+ } else {
+ onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
}
- }, actorContext.getClientDispatcher());
+ return null;
+ }, actorUtils.getClientDispatcher());
}
-
- return transactionContextWrapper;
+ return contextWrapper;
}
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
- final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
+ final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
- if(!knownLocal.containsKey(shardName)) {
+ if (!knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
knownLocal.putIfAbsent(shardName, factory);
}
- } else if(knownLocal.containsKey(shardName)) {
+ } else if (knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} invalidating local data tree", shardName);
knownLocal.remove(shardName);
}
}
- protected String getMemberName() {
- String memberName = getActorContext().getCurrentMemberName();
- if (memberName == null) {
- memberName = "UNKNOWN-MEMBER";
- }
-
- return memberName;
+ protected final MemberName getMemberName() {
+ return historyId.getClientId().getFrontendId().getMemberName();
}
/**
* factory.
* @return Transaction identifier, may not be null.
*/
- protected abstract TransactionIdentifier nextIdentifier();
+ protected final TransactionIdentifier nextIdentifier() {
+ return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
+ }
/**
* Find the primary shard actor.
* @param shardName Shard name
* @return Future containing shard information.
*/
- protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
+ protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
+ @NonNull TransactionIdentifier txId);
/**
* Create local transaction factory for specified shard, backed by specified shard leader
* and data tree instance.
*
- * @param shardName
- * @param shardLeader
+ * @param shardName the shard name
+ * @param shardLeader the shard leader
* @param dataTree Backing data tree instance. The data tree may only be accessed in
* read-only manner.
* @return Transaction factory for local use.
*/
- protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
+ protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
/**
* Callback invoked from child transactions to push any futures, which need to
* be waited for before the next transaction is allocated.
* @param cohortFutures Collection of futures
*/
- protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
+ protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
+ @NonNull Collection<Future<T>> cohortFutures);
+
+ /**
+ * Callback invoked when the internal TransactionContext has been created for a transaction.
+ *
+ * @param transactionId the ID of the transaction.
+ */
+ protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
final TransactionProxy parent) {
- switch(parent.getType()) {
+ switch (parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
@Override
- protected DOMStoreWriteTransaction getWriteDelegate() {
+ DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
}
@Override
- protected DOMStoreReadTransaction getReadDelegate() {
+ DOMStoreReadTransaction getReadDelegate() {
return readOnly;
}
};
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
@Override
- protected DOMStoreWriteTransaction getWriteDelegate() {
+ DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
}
@Override
- protected DOMStoreReadTransaction getReadDelegate() {
+ DOMStoreReadTransaction getReadDelegate() {
return readWrite;
}
};
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
@Override
- protected DOMStoreWriteTransaction getWriteDelegate() {
+ DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
}
@Override
- protected DOMStoreReadTransaction getReadDelegate() {
+ DOMStoreReadTransaction getReadDelegate() {
throw new UnsupportedOperationException();
}
};
- default:
- throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+ default:
+ throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
}
}
}