import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
final Shard shard) {
log.debug("{}: Readying transaction {}, client version {}", name,
void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
final Shard shard) {
log.debug("{}: Readying transaction {}, client version {}", name,
* @param sender the sender of the message
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
* @param sender the sender of the message
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
- CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
- if (cohortEntry == null) {
- cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()),
+ CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
+ if (cohortEntry == null || cohortEntry.isSealed()) {
+ cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
}
if (log.isDebugEnabled()) {
log.debug("{}: Applying {} batched modifications for Tx {}", name,
}
if (log.isDebugEnabled()) {
log.debug("{}: Applying {} batched modifications for Tx {}", name,
}
cohortEntry.applyModifications(batched.getModifications());
if (batched.isReady()) {
if (cohortEntry.getLastBatchedModificationsException() != null) {
}
cohortEntry.applyModifications(batched.getModifications());
if (batched.isReady()) {
if (cohortEntry.getLastBatchedModificationsException() != null) {
throw cohortEntry.getLastBatchedModificationsException();
}
if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
throw cohortEntry.getLastBatchedModificationsException();
}
if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
}
if (log.isDebugEnabled()) {
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
}
if (log.isDebugEnabled()) {
- log.debug("{}: Readying Tx {}, client version {}", name,
- batched.getTransactionID(), batched.getVersion());
+ log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
+ batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
* @param shard the transaction's shard actor
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
* @param shard the transaction's shard actor
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
- final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(),
- message.getModification());
+ final TransactionIdentifier txId = message.getTransactionId();
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+ message.getParticipatingShardNames());
Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
final int maxModificationsPerBatch) {
Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
final int maxModificationsPerBatch) {
if (cohortEntry == null || cohortEntry.getTransaction() == null) {
return Collections.singletonList(from);
}
if (cohortEntry == null || cohortEntry.getTransaction() == null) {
return Collections.singletonList(from);
}
protected BatchedModifications getModifications() {
if (newModifications.isEmpty()
|| newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
protected BatchedModifications getModifications() {
if (newModifications.isEmpty()
|| newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
- log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionID(), failure);
+ log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+ failure);
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
- private void doCommit(final CohortEntry cohortEntry) {
- log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
+ void doCommit(final CohortEntry cohortEntry) {
+ log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
// We perform the preCommit phase here atomically with the commit phase. This is an
// optimization to eliminate the overhead of an extra preCommit message. We lose front-end
// We perform the preCommit phase here atomically with the commit phase. This is an
// optimization to eliminate the overhead of an extra preCommit message. We lose front-end
@Override
public void onFailure(final Throwable failure) {
log.error("{} An exception occurred while preCommitting transaction {}", name,
@Override
public void onFailure(final Throwable failure) {
log.error("{} An exception occurred while preCommitting transaction {}", name,
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
- log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+ void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
+ log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
cohortEntry.getShard().self());
}
@Override
public void onFailure(final Throwable failure) {
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
cohortEntry.getShard().self());
}
@Override
public void onFailure(final Throwable failure) {
- log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- cohortEntry.getTransactionID(), failure);
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
log.debug("{}: Aborting transaction {}", name, transactionID);
final ActorRef self = shard.getSelf();
log.debug("{}: Aborting transaction {}", name, transactionID);
final ActorRef self = shard.getSelf();
- Iterator<CohortEntry> iter = cohortCache.values().iterator();
- while (iter.hasNext()) {
- CohortEntry cohortEntry = iter.next();
- if (cohortEntry.isFailed()) {
- iter.remove();
- }
- }
+ cohortCache.values().removeIf(CohortEntry::isFailed);