X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=af19d14c00cddf2ff5750bc43dbfc5031f425ce5;hb=04f22714755ee8e52a63386cc1f4290402659838;hp=dd27b4e629d2b593bc37e717ad52446e6b769eb2;hpb=ba87ed620f13823ee798fda4241a2c1db37e2f33;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index dd27b4e629..af19d14c00 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,22 +7,35 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; +import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE; + import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; -import com.google.common.io.ByteArrayDataOutput; +import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; +import java.io.StreamCorruptedException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; -import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; +import org.opendaylight.yangtools.concepts.Variant; +import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Payload persisted when a transaction commits. It contains the transaction identifier and the @@ -31,66 +44,169 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; * @author Robert Varga */ @Beta -public final class CommitTransactionPayload extends Payload implements DataTreeCandidateSupplier, Serializable { - private static final class Proxy implements Externalizable { +public abstract class CommitTransactionPayload extends IdentifiablePayload + implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); + private static final long serialVersionUID = 1L; + + private volatile Entry candidate = null; + + CommitTransactionPayload() { + + } + + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) + throws IOException { + final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity); + try (DataOutputStream dos = new DataOutputStream(cos)) { + transactionId.writeTo(dos); + DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); + } + + final Variant source = cos.toVariant(); + LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); + return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); + } + + @VisibleForTesting + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate, final PayloadVersion version) throws IOException { + return create(transactionId, candidate, version, 512); + } + + @VisibleForTesting + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate) throws IOException { + return create(transactionId, candidate, PayloadVersion.current()); + } + + public @NonNull Entry getCandidate() throws IOException { + Entry localCandidate = candidate; + if (localCandidate == null) { + synchronized (this) { + localCandidate = candidate; + if (localCandidate == null) { + candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + } + } + return localCandidate; + } + + public final @NonNull Entry getCandidate( + final ReusableStreamReceiver receiver) throws IOException { + final DataInput in = newDataInput(); + return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), + DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + } + + @Override + public TransactionIdentifier getIdentifier() { + try { + return getCandidate().getKey(); + } catch (IOException e) { + throw new IllegalStateException("Candidate deserialization failed.", e); + } + } + + abstract void writeBytes(ObjectOutput out) throws IOException; + + abstract DataInput newDataInput(); + + final Object writeReplace() { + return new Proxy(this); + } + + private static final class Simple extends CommitTransactionPayload { private static final long serialVersionUID = 1L; - private byte[] serialized; - public Proxy() { - // For Externalizable + private final byte[] serialized; + + Simple(final byte[] serialized) { + this.serialized = requireNonNull(serialized); } - Proxy(final byte[] serialized) { - this.serialized = Preconditions.checkNotNull(serialized); + @Override + public int size() { + return serialized.length; } @Override - public void writeExternal(final ObjectOutput out) throws IOException { - out.writeInt(serialized.length); + DataInput newDataInput() { + return ByteStreams.newDataInput(serialized); + } + + @Override + void writeBytes(final ObjectOutput out) throws IOException { out.write(serialized); } + } + + private static final class Chunked extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy") + private final ChunkedByteArray source; + + Chunked(final ChunkedByteArray source) { + this.source = requireNonNull(source); + } @Override - public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { - final int length = in.readInt(); - serialized = new byte[length]; - in.readFully(serialized); + void writeBytes(final ObjectOutput out) throws IOException { + source.copyTo(out); } - private Object readResolve() { - return new CommitTransactionPayload(serialized); + @Override + public int size() { + return source.size(); + } + + @Override + DataInput newDataInput() { + return new DataInputStream(source.openStream()); } } - private static final long serialVersionUID = 1L; + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; - private final byte[] serialized; + private CommitTransactionPayload payload; - CommitTransactionPayload(final byte[] serialized) { - this.serialized = Preconditions.checkNotNull(serialized); - } + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } - public static CommitTransactionPayload create(final TransactionIdentifier transactionId, - final DataTreeCandidate candidate) throws IOException { - final ByteArrayDataOutput out = ByteStreams.newDataOutput(); - transactionId.writeTo(out); - DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate); - return new CommitTransactionPayload(out.toByteArray()); - } + Proxy(final CommitTransactionPayload payload) { + this.payload = requireNonNull(payload); + } - @Override - public Entry, DataTreeCandidate> getCandidate() throws IOException { - final DataInput in = ByteStreams.newDataInput(serialized); - return new SimpleImmutableEntry<>(Optional.of(TransactionIdentifier.readFrom(in)), - DataTreeCandidateInputOutput.readDataTreeCandidate(in)); - } + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeInt(payload.size()); + payload.writeBytes(out); + } - @Override - public int size() { - return serialized.length; - } + @Override + public void readExternal(final ObjectInput in) throws IOException { + final int length = in.readInt(); + if (length < 0) { + throw new StreamCorruptedException("Invalid payload length " + length); + } else if (length < MAX_ARRAY_SIZE) { + final byte[] serialized = new byte[length]; + in.readFully(serialized); + payload = new Simple(serialized); + } else { + payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE)); + } + } - private Object writeReplace() { - return new Proxy(serialized); + private Object readResolve() { + return verifyNotNull(payload); + } } }