X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=f18271ae36671c342d7b5172a9e40c6b6c662d2b;hb=refs%2Fchanges%2F83%2F48683%2F4;hp=89fa8fbc2507fc1f25d8de01cabb48a0cfd4359d;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index 89fa8fbc25..f18271ae36 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@ -12,8 +12,14 @@ import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayDeque;
@@ -27,15 +33,19 @@ import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -68,6 +78,7 @@ import scala.concurrent.duration.Duration;
* e.g. it does not expose public interfaces and assumes it is only ever called from a
* single thread.
*
+ *
* This class is not part of the API contract and is subject to change at any time.
*/
@NotThreadSafe
@@ -90,6 +101,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
private final Queue pendingTransactions = new ArrayDeque<>();
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
+ private final Collection> metadata;
private final TipProducingDataTree dataTree;
private final String logContext;
private final Shard shard;
@@ -99,14 +111,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
- final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
- this.dataTree = dataTree;
+ final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+ final ShardDataTreeMetadata>... metadata) {
+ this.dataTree = Preconditions.checkNotNull(dataTree);
updateSchemaContext(schemaContext);
this.shard = Preconditions.checkNotNull(shard);
this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
+ this.metadata = ImmutableList.copyOf(metadata);
}
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
@@ -134,26 +148,227 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return schemaContext;
}
- void updateSchemaContext(final SchemaContext schemaContext) {
- dataTree.setSchemaContext(schemaContext);
- this.schemaContext = Preconditions.checkNotNull(schemaContext);
+ void updateSchemaContext(final SchemaContext newSchemaContext) {
+ dataTree.setSchemaContext(newSchemaContext);
+ this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
- ShardDataTreeSnapshot takeRecoverySnapshot() {
- return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
+ /**
+ * Take a snapshot of current state for later recovery.
+ *
+ * @return A state snapshot
+ */
+ @Nonnull ShardDataTreeSnapshot takeStateSnapshot() {
+ final NormalizedNode, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get();
+ final Builder>, ShardDataTreeSnapshotMetadata>> metaBuilder =
+ ImmutableMap.builder();
+
+ for (ShardDataTreeMetadata> m : metadata) {
+ final ShardDataTreeSnapshotMetadata> meta = m.toSnapshot();
+ if (meta != null) {
+ metaBuilder.put(meta.getType(), meta);
+ }
+ }
+
+ return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
}
- void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
- // FIXME: purge any outstanding transactions
+ private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
+ final UnaryOperator wrapper) throws DataValidationFailedException {
+ final Stopwatch elapsed = Stopwatch.createStarted();
- final DataTreeModification snapshot = transaction.getSnapshot();
- snapshot.ready();
+ if (!pendingTransactions.isEmpty()) {
+ LOG.warn("{}: applying state snapshot with pending transactions", logContext);
+ }
+
+ final Map>, ShardDataTreeSnapshotMetadata>> snapshotMeta;
+ if (snapshot instanceof MetadataShardDataTreeSnapshot) {
+ snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata();
+ } else {
+ snapshotMeta = ImmutableMap.of();
+ }
+
+ for (ShardDataTreeMetadata> m : metadata) {
+ final ShardDataTreeSnapshotMetadata> s = snapshotMeta.get(m.getSupportedType());
+ if (s != null) {
+ m.applySnapshot(s);
+ } else {
+ m.reset();
+ }
+ }
+
+ final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification());
+ // delete everything first
+ mod.delete(YangInstanceIdentifier.EMPTY);
+
+ final java.util.Optional> maybeNode = snapshot.getRootNode();
+ if (maybeNode.isPresent()) {
+ // Add everything from the remote node back
+ mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+ }
+ mod.ready();
+
+ final DataTreeModification unwrapped = unwrap(mod);
+ dataTree.validate(unwrapped);
+ DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
+ dataTree.commit(candidate);
+ notifyListeners(candidate);
+
+ LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+ }
+
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
+ private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
+ return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+ }
+
+ private static DataTreeModification unwrap(final DataTreeModification modification) {
+ if (modification instanceof PruningDataTreeModification) {
+ return ((PruningDataTreeModification)modification).delegate();
+ }
+ return modification;
+ }
- dataTree.validate(snapshot);
- dataTree.commit(dataTree.prepare(snapshot));
+ /**
+ * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, this::wrapWithPruning);
}
- private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+ final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
+ DataTreeCandidates.applyToModification(mod, candidate);
+ mod.ready();
+
+ final DataTreeModification unwrapped = mod.delegate();
+ LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
+
+ try {
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ } catch (Exception e) {
+ File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-payload-" + logContext + ".out");
+ DataTreeModificationOutput.toFile(file, unwrapped);
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s",
+ logContext, file), e);
+ }
+ }
+
+ /**
+ * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+ if (payload instanceof CommitTransactionPayload) {
+ final Entry e =
+ ((CommitTransactionPayload) payload).getCandidate();
+ applyRecoveryCandidate(e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else if (payload instanceof DataTreeCandidatePayload) {
+ applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
+ } else {
+ LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
+ }
+ }
+
+ private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ throws DataValidationFailedException {
+ LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+
+ final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+ DataTreeCandidates.applyToModification(mod, foreign);
+ mod.ready();
+
+ LOG.trace("{}: Applying foreign modification {}", logContext, mod);
+ dataTree.validate(mod);
+ final DataTreeCandidate candidate = dataTree.prepare(mod);
+ dataTree.commit(candidate);
+
+ notifyListeners(candidate);
+ }
+
+ /**
+ * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+ * SchemaContexts match and does not perform any pruning.
+ *
+ * @param identifier Payload identifier as returned from RaftActor
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ DataValidationFailedException {
+ /*
+ * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+ * if we are the leader and it has originated with us.
+ *
+ * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+ * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+ * acting in that capacity anymore.
+ *
+ * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+ * pre-Boron state -- which limits the number of options here.
+ */
+ if (payload instanceof CommitTransactionPayload) {
+ if (identifier == null) {
+ final Entry e =
+ ((CommitTransactionPayload) payload).getCandidate();
+ applyReplicatedCandidate(e.getKey(), e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else {
+ Verify.verify(identifier instanceof TransactionIdentifier);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
+ }
+ } else {
+ LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+ }
+ }
+
+ private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ final CommitEntry current = pendingTransactions.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ return;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ return;
+ }
+
+ finishCommit(current.cohort);
+ }
+
+ private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata> m : metadata) {
+ m.onTransactionCommitted(txId);
+ }
+ }
+
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
@@ -180,6 +395,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
}
+ @VisibleForTesting
public void notifyListeners(final DataTreeCandidate candidate) {
treeChangeListenerPublisher.publishChanges(candidate, logContext);
dataChangeListenerPublisher.publishChanges(candidate, logContext);
@@ -225,22 +441,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
Optional> registerChangeListener(final YangInstanceIdentifier path,
final AsyncDataChangeListener> listener,
final DataChangeScope scope) {
- final DataChangeListenerRegistration>> reg =
+ DataChangeListenerRegistration>> reg =
dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
return new SimpleEntry<>(reg, readCurrentData());
}
private Optional readCurrentData() {
- final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+ final Optional> currentState =
+ dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent();
}
- public Entry, Optional> registerTreeChangeListener(
- final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration reg = treeChangeListenerPublisher.registerTreeChangeListener(
- path, listener);
+ public Entry, Optional>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration reg =
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
return new SimpleEntry<>(reg, readCurrentData());
}
@@ -249,20 +466,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return pendingTransactions.size();
}
- void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
- LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
-
- final DataTreeModification mod = dataTree.takeSnapshot().newModification();
- DataTreeCandidates.applyToModification(mod, foreign);
- mod.ready();
-
- LOG.trace("{}: Applying foreign modification {}", logContext, mod);
- dataTree.validate(mod);
- final DataTreeCandidate candidate = dataTree.prepare(mod);
- dataTree.commit(candidate);
- notifyListeners(candidate);
- }
-
@Override
void abortTransaction(final AbstractShardDataTreeTransaction> transaction) {
// Intentional no-op
@@ -273,34 +476,40 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return createReadyCohort(transaction.getId(), snapshot);
+ return createReadyCohort(transaction.getIdentifier(), snapshot);
}
public Optional> readNode(final YangInstanceIdentifier path) {
return dataTree.takeSnapshot().readNode(path);
}
- public DataTreeSnapshot takeSnapshot() {
+ DataTreeSnapshot takeSnapshot() {
return dataTree.takeSnapshot();
}
+ @VisibleForTesting
public DataTreeModification newModification() {
return dataTree.takeSnapshot().newModification();
}
+ /**
+ * Commits a modification.
+ *
+ * @deprecated This method violates DataTree containment and will be removed.
+ */
@VisibleForTesting
- // FIXME: This should be removed, it violates encapsulation
+ @Deprecated
public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
modification.ready();
dataTree.validate(modification);
- DataTreeCandidateTip candidate = dataTree.prepare(modification);
+ DataTreeCandidate candidate = dataTree.prepare(modification);
dataTree.commit(candidate);
return candidate;
}
public Collection getAndClearPendingTransactions() {
Collection ret = new ArrayList<>(pendingTransactions.size());
- for(CommitEntry entry: pendingTransactions) {
+ for (CommitEntry entry: pendingTransactions) {
ret.add(entry.cohort);
}
@@ -308,13 +517,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return ret;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void processNextTransaction() {
while (!pendingTransactions.isEmpty()) {
final CommitEntry entry = pendingTransactions.peek();
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+ if (cohort.getState() != State.CAN_COMMIT_PENDING) {
break;
}
@@ -336,7 +546,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+ dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation.", e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -366,6 +577,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
@@ -397,6 +609,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
final TransactionIdentifier txId = cohort.getIdentifier();
final DataTreeCandidate candidate = cohort.getCandidate();
@@ -404,24 +617,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
try {
- try {
- dataTree.commit(candidate);
- } catch (IllegalStateException e) {
- // We may get a "store tree and candidate base differ" IllegalStateException from commit under
- // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
- // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
- // applying it to the state. We then become the leader and a second tx is pre-committed and
- // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
- // candidate via applyState prior to the second tx. Since the second tx has already been
- // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
- // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
- // solution will be forthcoming.
-
- LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
- applyForeignCandidate(txId, candidate);
- }
+ dataTree.commit(candidate);
} catch (Exception e) {
+ LOG.error("{}: Failed to commit transaction {}", logContext, txId, e);
failCommit(e);
return;
}
@@ -430,7 +628,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
-
pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
@@ -468,32 +665,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
}
- private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
- final CommitEntry current = pendingTransactions.peek();
- if (current == null) {
- LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
- return;
- }
-
- if (!current.cohort.getIdentifier().equals(txId)) {
- LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
- current.cohort.getIdentifier(), txId);
- return;
- }
-
- finishCommit(current.cohort);
- }
-
- void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
- // For now we do not care about anything else but transactions
- Verify.verify(identifier instanceof TransactionIdentifier);
- payloadReplicationComplete((TransactionIdentifier)identifier, payload);
- }
-
void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
+ @Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
final DataTreeModification modification) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
@@ -502,11 +678,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return cohort;
}
- void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
- throws DataValidationFailedException, IOException {
- applyForeignCandidate(identifier, payload.getCandidate().getValue());
- }
-
+ @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"},
+ justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = shard.ticker().read();
@@ -520,6 +693,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
break;
case CAN_COMMIT_COMPLETE:
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
break;
case PRE_COMMIT_PENDING:
@@ -556,6 +732,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
case FAILED:
case READY:
default:
+ // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+ // this case, we just want to drop the current entry that expired and thus ignore the return value.
+ // In fact we really shouldn't hit this case but we handle all enums for completeness.
pendingTransactions.poll();
}
@@ -565,6 +744,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "See inline comment below.")
void startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator it = pendingTransactions.iterator();
if (!it.hasNext()) {
@@ -578,6 +758,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
if (cohort.getState() != State.COMMIT_PENDING) {
LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
cohort.getIdentifier());
+
+ // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+ // this case, we've already obtained the head of the queue above via the Iterator and we just want to
+ // remove it here.
pendingTransactions.poll();
processNextTransaction();
} else {
@@ -605,12 +789,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
private void maybeRunOperationOnPendingTransactionsComplete() {
- if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
- LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
- runOnPendingTransactionsComplete);
-
- runOnPendingTransactionsComplete.run();
- runOnPendingTransactionsComplete = null;
- }
- }
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
}