*
* @author Thomas Pantelis
*/
-// Noo-final for mocking
+// Non-final for mocking
public class DatastoreContext implements ClientActorConfig {
public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
+ public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
+ private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
this.requestTimeout = other.requestTimeout;
this.noProgressTimeout = other.noProgressTimeout;
+ this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return noProgressTimeout;
}
+ public int getInitialPayloadSerializedBufferCapacity() {
+ return initialPayloadSerializedBufferCapacity;
+ }
+
public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
+ public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
+ datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
+ return this;
+ }
+
@Override
public DatastoreContext build() {
datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else if (message instanceof PersistAbortTransactionPayload) {
final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
- persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ persistPayload(txId, AbortTransactionPayload.create(
+ txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
} else if (callback != null) {
callback.run();
}
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, CloseLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
/**
return;
}
- replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, PurgeLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
Optional<DataTreeCandidate> readCurrentData() {
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
- replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ replicatePayload(id, AbortTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
@Override
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ replicatePayload(id, PurgeTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate);
+ payload = CommitTransactionPayload.create(txId, candidate,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
pendingCommits.poll().cohort.failedCommit(e);
super(transactionId, serialized);
}
- public static AbortTransactionPayload create(final TransactionIdentifier transactionId) {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ public static AbortTransactionPayload create(final TransactionIdentifier transactionId,
+ final int initialSerializedBufferCapacity) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
try {
transactionId.writeTo(out);
} catch (IOException e) {
super(historyId, serialized);
}
- public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId,
+ final int initialSerializedBufferCapacity) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
try {
historyId.writeTo(out);
} catch (IOException e) {
package org.opendaylight.controller.cluster.datastore.persisted;
import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Payload persisted when a transaction commits. It contains the transaction identifier and the
*/
@Beta
public final class CommitTransactionPayload extends Payload implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
+
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
private byte[] serialized;
}
public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
- final DataTreeCandidate candidate) throws IOException {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
transactionId.writeTo(out);
DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate);
- return new CommitTransactionPayload(out.toByteArray());
+ final byte[] serialized = out.toByteArray();
+
+ LOG.info("Initial buffer capacity {}, actual serialized size {}",
+ initialSerializedBufferCapacity, serialized.length);
+
+ return new CommitTransactionPayload(serialized);
+ }
+
+ @VisibleForTesting
+ public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
+ final DataTreeCandidate candidate) throws IOException {
+ return create(transactionId, candidate, 512);
}
public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate() throws IOException {
super(historyId, serialized);
}
- public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId,
+ final int initialSerializedBufferCapacity) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
try {
historyId.writeTo(out);
} catch (IOException e) {
super(historyId, serialized);
}
- public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId,
+ final int initialSerializedBufferCapacity) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
try {
historyId.writeTo(out);
} catch (IOException e) {
super(transactionId, serialized);
}
- public static PurgeTransactionPayload create(final TransactionIdentifier transactionId) {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ public static PurgeTransactionPayload create(final TransactionIdentifier transactionId,
+ final int initialSerializedBufferCapacity) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
try {
transactionId.writeTo(out);
} catch (IOException e) {
description "The timeout interval whereby the client front-end hasn't made progress with the
back-end on any request and terminates.";
}
+
+ leaf initial-payload-serialized-buffer-capacity {
+ default 512;
+ type non-zero-uint32-type;
+ description "The initial buffer capacity, in bytes, to use when serializing message payloads.";
+ }
}
container data-store-properties-container {
properties.put(" max shard data change listener queue size", "2222");
properties.put("mAx-shaRd-data-STORE-executor-quEUe-size", "3333");
properties.put("persistent", "false");
+ properties.put("initial-payload-serialized-buffer-capacity", "600");
boolean updated = introspector.update(properties);
assertTrue("updated", updated);
assertEquals(1111, context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
assertEquals(2222, context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
assertEquals(3333, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+ assertEquals(600, context.getInitialPayloadSerializedBufferCapacity());
assertFalse(context.isPersistent());
properties.put("shard-transaction-idle-timeout-in-minutes", "32");
import static org.junit.Assert.assertEquals;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
builder.maxShardDataStoreExecutorQueueSize(
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1);
+ builder.initialPayloadSerializedBufferCapacity(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1);
DatastoreContext context = builder.build();
assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1,
context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize());
+ assertEquals(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1,
+ context.getInitialPayloadSerializedBufferCapacity());
}
}
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class ShardDataTreeTest extends AbstractTest {
+ private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
private final Shard mockShard = Mockito.mock(Shard.class);
private ShardDataTree shardDataTree;
@Before
public void setUp() {
doReturn(Ticker.systemTicker()).when(mockShard).ticker();
- doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
+ doReturn(mock(ShardStats.class)).when(mockShard).getShardMBean();
+ doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
fullSchema = SchemaContextHelper.full();
@Override
AbortTransactionPayload object() {
- return AbortTransactionPayload.create(nextTransactionId());
+ return AbortTransactionPayload.create(nextTransactionId(), 512);
}
}
@Override
CloseLocalHistoryPayload object() {
- return CloseLocalHistoryPayload.create(nextHistoryId());
+ return CloseLocalHistoryPayload.create(nextHistoryId(), 512);
}
-}
\ No newline at end of file
+}
@Override
CreateLocalHistoryPayload object() {
- return CreateLocalHistoryPayload.create(nextHistoryId());
+ return CreateLocalHistoryPayload.create(nextHistoryId(), 512);
}
-}
\ No newline at end of file
+}
@Override
PurgeLocalHistoryPayload object() {
- return PurgeLocalHistoryPayload.create(nextHistoryId());
+ return PurgeLocalHistoryPayload.create(nextHistoryId(), 512);
}
-}
\ No newline at end of file
+}
@Override
PurgeTransactionPayload object() {
- return PurgeTransactionPayload.create(nextTransactionId());
+ return PurgeTransactionPayload.create(nextTransactionId(), 512);
}
-}
\ No newline at end of file
+}