Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Convert CDS implementation to use msdal APIs
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
AbstractTransactionContextFactory.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
index 225f3c201f5c5df7c6f5d41d988f7d266f1141fe..70a014f2a46f30907190bc768b38d2ab968f51e6 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
@@
-14,9
+14,11
@@
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLong
FieldUpdater
;
import javax.annotation.Nonnull;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@
-34,31
+36,43
@@
import scala.util.Try;
*/
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
*/
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
-
- protected static final AtomicLong TX_COUNTER = new AtomicLong();
+ @SuppressWarnings("rawtypes")
+ private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
+ private final LocalHistoryIdentifier historyId;
private final ActorContext actorContext;
private final ActorContext actorContext;
- protected AbstractTransactionContextFactory(final ActorContext actorContext) {
+ // Used via TX_COUNTER_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx;
+
+ protected AbstractTransactionContextFactory(final ActorContext actorContext,
+ final LocalHistoryIdentifier historyId) {
this.actorContext = Preconditions.checkNotNull(actorContext);
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.historyId = Preconditions.checkNotNull(historyId);
}
final ActorContext getActorContext() {
return actorContext;
}
}
final ActorContext getActorContext() {
return actorContext;
}
- private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
+ final LocalHistoryIdentifier getHistoryId() {
+ return historyId;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
+ final String shardName) {
final LocalTransactionFactory local = knownLocal.get(shardName);
if (local != null) {
final LocalTransactionFactory local = knownLocal.get(shardName);
if (local != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
- parent.getIdentifier(), shardName, local);
- }
+ LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
+ shardName, local);
try {
return createLocalTransactionContext(local, parent);
try {
return createLocalTransactionContext(local, parent);
- } catch(Exception e) {
+ } catch
(Exception e) {
return new NoOpTransactionContext(e, parent.getIdentifier());
}
}
return new NoOpTransactionContext(e, parent.getIdentifier());
}
}
@@
-68,16
+82,14
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
String shardName, TransactionContextWrapper transactionContextWrapper) {
private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
String shardName, TransactionContextWrapper transactionContextWrapper) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
- primaryShardInfo.getPrimaryShardActor(), shardName);
- }
+ LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
try {
TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
updateShardInfo(shardName, primaryShardInfo);
try {
TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- if(localContext != null) {
+ if
(localContext != null) {
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
@@
-101,14
+113,15
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
}
}
}
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
+ final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final String shardName) {
final TransactionContextWrapper transactionContextWrapper =
final TransactionContextWrapper transactionContextWrapper =
- new TransactionContextWrapper(parent.getIdentifier(), actorContext);
+ new TransactionContextWrapper(parent.getIdentifier(), actorContext
, shardName
);
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
- if(findPrimaryFuture.isCompleted()) {
+ if
(findPrimaryFuture.isCompleted()) {
Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
- if(maybe.isSuccess()) {
+ if
(maybe.isSuccess()) {
onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
} else {
onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
} else {
onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
@@
-132,26
+145,21
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
- if(!knownLocal.containsKey(shardName)) {
+ if
(!knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
knownLocal.putIfAbsent(shardName, factory);
}
LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
knownLocal.putIfAbsent(shardName, factory);
}
- } else if(knownLocal.containsKey(shardName)) {
+ } else if
(knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} invalidating local data tree", shardName);
knownLocal.remove(shardName);
}
}
LOG.debug("Shard {} invalidating local data tree", shardName);
knownLocal.remove(shardName);
}
}
- protected String getMemberName() {
- String memberName = getActorContext().getCurrentMemberName();
- if (memberName == null) {
- memberName = "UNKNOWN-MEMBER";
- }
-
- return memberName;
+ protected final MemberName getMemberName() {
+ return historyId.getClientId().getFrontendId().getMemberName();
}
/**
}
/**
@@
-159,7
+167,9
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
* factory.
* @return Transaction identifier, may not be null.
*/
* factory.
* @return Transaction identifier, may not be null.
*/
- protected abstract TransactionIdentifier nextIdentifier();
+ protected final TransactionIdentifier nextIdentifier() {
+ return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
+ }
/**
* Find the primary shard actor.
/**
* Find the primary shard actor.
@@
-174,8
+184,8
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
* Create local transaction factory for specified shard, backed by specified shard leader
* and data tree instance.
*
* Create local transaction factory for specified shard, backed by specified shard leader
* and data tree instance.
*
- * @param shardName
- * @param shardLeader
+ * @param shardName
the shard name
+ * @param shardLeader
the shard leader
* @param dataTree Backing data tree instance. The data tree may only be accessed in
* read-only manner.
* @return Transaction factory for local use.
* @param dataTree Backing data tree instance. The data tree may only be accessed in
* read-only manner.
* @return Transaction factory for local use.
@@
-187,7
+197,8
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
* be waited for before the next transaction is allocated.
* @param cohortFutures Collection of futures
*/
* be waited for before the next transaction is allocated.
* @param cohortFutures Collection of futures
*/
- protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
+ protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction,
+ @Nonnull Collection<Future<T>> cohortFutures);
/**
* Callback invoked when the internal TransactionContext has been created for a transaction.
/**
* Callback invoked when the internal TransactionContext has been created for a transaction.
@@
-199,7
+210,7
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
final TransactionProxy parent) {
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
final TransactionProxy parent) {
- switch(parent.getType()) {
+ switch
(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
@@
-239,8
+250,8
@@
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
throw new UnsupportedOperationException();
}
};
throw new UnsupportedOperationException();
}
};
-
default:
-
throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+ default:
+ throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
}
}
}
}
}
}