import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final DataTreeModification unwrapped = newModification();
- final PruningDataTreeModification mod = createPruningModification(unwrapped,
- NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().version()) > 0);
+ final var entry = payload.acquireCandidate();
+ final var unwrapped = newModification();
+ final var pruningMod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.streamVersion()) > 0);
- DataTreeCandidates.applyToModification(mod, entry.getValue().candidate());
- mod.ready();
+ DataTreeCandidates.applyToModification(pruningMod, entry.candidate());
+ pruningMod.ready();
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
try {
dataTree.validate(unwrapped);
dataTree.commit(dataTree.prepare(unwrapped));
} catch (Exception e) {
- File file = new File(System.getProperty("karaf.data", "."),
+ final var file = new File(System.getProperty("karaf.data", "."),
"failed-recovery-payload-" + logContext + ".out");
DataTreeModificationOutput.toFile(file, unwrapped);
- throw new IllegalStateException(String.format(
- "%s: Failed to apply recovery payload. Modification data was written to file %s",
- logContext, file), e);
+ throw new IllegalStateException(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s".formatted(
+ logContext, file),
+ e);
}
- allMetadataCommittedTransaction(entry.getKey());
+ allMetadataCommittedTransaction(entry.transactionId());
}
private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
final boolean uintAdapting) {
// TODO: we should be able to reuse the pruner, provided we are not reentrant
- final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
- dataSchemaContext);
+ final var pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext);
return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
: new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
}
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final TransactionIdentifier identifier = entry.getKey();
- LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+ final var payloadCandidate = payload.acquireCandidate();
+ final var transactionId = payloadCandidate.transactionId();
+ LOG.debug("{}: Applying foreign transaction {}", logContext, transactionId);
- final DataTreeModification mod = newModification();
+ final var mod = newModification();
// TODO: check version here, which will enable us to perform forward-compatibility transformations
- DataTreeCandidates.applyToModification(mod, entry.getValue().candidate());
+ DataTreeCandidates.applyToModification(mod, payloadCandidate.candidate());
mod.ready();
LOG.trace("{}: Applying foreign modification {}", logContext, mod);
dataTree.validate(mod);
- final DataTreeCandidate candidate = dataTree.prepare(mod);
+ final var candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
- allMetadataCommittedTransaction(identifier);
+ allMetadataCommittedTransaction(transactionId);
notifyListeners(candidate);
}
}
// make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
- checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue().candidate());
+ checkRootOverwrite(commit.acquireCandidate().candidate());
} else if (payload instanceof AbortTransactionPayload abort) {
if (identifier != null) {
payloadReplicationComplete(abort);
for (var entry : entries) {
final var data = entry.getData();
if (data instanceof CommitTransactionPayload payload) {
- final var candidate = payload.getCandidate().getValue().candidate();
+ final var candidate = payload.getCandidate().candidate();
writeNode(jsonWriter, candidate);
} else {
jsonWriter.beginObject().name("Payload").value(data.toString()).endObject();
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.Serializable;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Map.Entry;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.io.ChunkedByteArray;
import org.opendaylight.controller.cluster.io.ChunkedOutputStream;
import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
-import org.opendaylight.yangtools.concepts.Either;
import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
import org.slf4j.Logger;
@Beta
public abstract sealed class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
implements Serializable {
+ @NonNullByDefault
+ public record CandidateTransaction(
+ TransactionIdentifier transactionId,
+ DataTreeCandidate candidate,
+ NormalizedNodeStreamVersion streamVersion) {
+ public CandidateTransaction {
+ requireNonNull(transactionId);
+ requireNonNull(candidate);
+ requireNonNull(streamVersion);
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
private static final long serialVersionUID = 1L;
static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
"org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
- private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
+ private volatile CandidateTransaction candidate = null;
- CommitTransactionPayload() {
+ private CommitTransactionPayload() {
// hidden on purpose
}
public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity)
throws IOException {
- final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
- try (DataOutputStream dos = new DataOutputStream(cos)) {
+ final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
+ try (var dos = new DataOutputStream(cos)) {
transactionId.writeTo(dos);
DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate);
}
- final Either<byte[], ChunkedByteArray> source = cos.toVariant();
+ final var source = cos.toVariant();
LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
}
return create(transactionId, candidate, PayloadVersion.current());
}
- public @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
- Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
+ public @NonNull CandidateTransaction getCandidate() throws IOException {
+ var localCandidate = candidate;
if (localCandidate == null) {
synchronized (this) {
localCandidate = candidate;
return localCandidate;
}
- public final @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
- final ReusableStreamReceiver receiver) throws IOException {
- final DataInput in = newDataInput();
- return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
- DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
+ public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException {
+ final var in = newDataInput();
+ final var transactionId = TransactionIdentifier.readFrom(in);
+ final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver);
+
+ return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version());
}
@Override
public TransactionIdentifier getIdentifier() {
try {
- return getCandidate().getKey();
+ return getCandidate().transactionId();
} catch (IOException e) {
throw new IllegalStateException("Candidate deserialization failed.", e);
}
* deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
* this was the last time the candidate was needed ant it is safe to be cleared.
*/
- public Entry<TransactionIdentifier, DataTreeCandidateWithVersion> acquireCandidate() throws IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = getCandidate();
+ public @NonNull CandidateTransaction acquireCandidate() throws IOException {
+ final var localCandidate = getCandidate();
candidate = null;
return localCandidate;
}
final var helper = MoreObjects.toStringHelper(this);
final var localCandidate = candidate;
if (localCandidate != null) {
- helper.add("identifier", candidate.getKey());
+ helper.add("identifier", candidate.transactionId());
}
return helper.add("size", size()).toString();
}
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractTest;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload.CandidateTransaction;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.common.QName;
}
}
- private static void assertCandidateEquals(final DataTreeCandidate expected,
- final DataTreeCandidateWithVersion actual) {
+ private static void assertCandidateEquals(final DataTreeCandidate expected, final CandidateTransaction actual) {
final var candidate = actual.candidate();
assertEquals("root path", expected.getRootPath(), candidate.getRootPath());
assertCandidateNodeEquals(expected.getRootNode(), candidate.getRootNode());
@Test
public void testCandidateSerDes() throws IOException {
final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
public void testPayloadSerDes() throws IOException {
final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue());
+ assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
}
@Test
.withValue("one")
.build());
CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
.build())
.build());
CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
.build())
.build());
CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
candidate = DataTreeCandidates.fromNormalizedNode(leafPath,
ImmutableNodes.leafNode(TestModel.DESC_QNAME, "test"));
CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
candidate = dataTree.prepare(modification);
CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
}