import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.Collection;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
*/
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
-
- protected static final AtomicLong TX_COUNTER = new AtomicLong();
+ @SuppressWarnings("rawtypes")
+ private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
+ private final LocalHistoryIdentifier historyId;
private final ActorContext actorContext;
- protected AbstractTransactionContextFactory(final ActorContext actorContext) {
+ // Used via TX_COUNTER_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx;
+
+ protected AbstractTransactionContextFactory(final ActorContext actorContext,
+ final LocalHistoryIdentifier historyId) {
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.historyId = Preconditions.checkNotNull(historyId);
}
final ActorContext getActorContext() {
return actorContext;
}
- private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
+ final LocalHistoryIdentifier getHistoryId() {
+ return historyId;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
+ final String shardName) {
final LocalTransactionFactory local = knownLocal.get(shardName);
if (local != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
- parent.getIdentifier(), shardName, local);
- }
+ LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
+ shardName, local);
try {
return createLocalTransactionContext(local, parent);
- } catch(Exception e) {
+ } catch (Exception e) {
return new NoOpTransactionContext(e, parent.getIdentifier());
}
}
private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
String shardName, TransactionContextWrapper transactionContextWrapper) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
- primaryShardInfo.getPrimaryShardActor(), shardName);
- }
+ LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
try {
TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- if(localContext != null) {
+ if (localContext != null) {
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
parent, shardName);
- remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
+ remote.setPrimaryShard(primaryShardInfo);
}
} finally {
onTransactionContextCreated(parent.getIdentifier());
}
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
+ final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final String shardName) {
final TransactionContextWrapper transactionContextWrapper =
new TransactionContextWrapper(parent.getIdentifier(), actorContext);
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
- if(findPrimaryFuture.isCompleted()) {
+ if (findPrimaryFuture.isCompleted()) {
Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
- if(maybe.isSuccess()) {
+ if (maybe.isSuccess()) {
onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
} else {
onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
- if(!knownLocal.containsKey(shardName)) {
+ if (!knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
knownLocal.putIfAbsent(shardName, factory);
}
- } else if(knownLocal.containsKey(shardName)) {
+ } else if (knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} invalidating local data tree", shardName);
knownLocal.remove(shardName);
}
}
- protected String getMemberName() {
- String memberName = getActorContext().getCurrentMemberName();
- if (memberName == null) {
- memberName = "UNKNOWN-MEMBER";
- }
-
- return memberName;
+ protected final MemberName getMemberName() {
+ return historyId.getClientId().getFrontendId().getMemberName();
}
/**
* factory.
* @return Transaction identifier, may not be null.
*/
- protected abstract TransactionIdentifier nextIdentifier();
+ protected final TransactionIdentifier nextIdentifier() {
+ return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
+ }
/**
* Find the primary shard actor.
* Create local transaction factory for specified shard, backed by specified shard leader
* and data tree instance.
*
- * @param shardName
- * @param shardLeader
+ * @param shardName the shard name
+ * @param shardLeader the shard leader
* @param dataTree Backing data tree instance. The data tree may only be accessed in
* read-only manner.
* @return Transaction factory for local use.
* be waited for before the next transaction is allocated.
* @param cohortFutures Collection of futures
*/
- protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
+ protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction,
+ @Nonnull Collection<Future<T>> cohortFutures);
/**
* Callback invoked when the internal TransactionContext has been created for a transaction.
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
final TransactionProxy parent) {
- switch(parent.getType()) {
+ switch (parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
throw new UnsupportedOperationException();
}
};
- default:
- throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+ default:
+ throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
}
}
}