Specify initial serialization buffer capacity for Payloads 33/78433/4
authorTom Pantelis <tompantelis@gmail.com>
Tue, 4 Dec 2018 18:02:38 +0000 (13:02 -0500)
committerStephen Kitt <skitt@redhat.com>
Tue, 18 Dec 2018 17:05:10 +0000 (17:05 +0000)
JFR shows a lot of re-allocations of the backing byte [] when
serializing CommitTransactionPayload. Specify a reasonable
initial buffer capacity (the default in ByteArrayOutputStream
is 32) to reduce re-allocations. This is also configurable via
the cfg file (default is 512).

Also did the same for the other smaller Payload classes like
PurgeTransactionPayload.

JIRA: CONTROLLER-1870
Change-Id: I7ebced56812bfc102409b5b2a8b7f4b1b54359fc
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
18 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayloadTest.java

index e67ef53..98afd7f 100644 (file)
@@ -33,7 +33,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * @author Thomas Pantelis
  */
-// Noo-final for mocking
+// Non-final for mocking
 public class DatastoreContext implements ClientActorConfig {
     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
 
@@ -59,6 +59,7 @@ public class DatastoreContext implements ClientActorConfig {
     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
+    public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
 
     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
 
@@ -92,6 +93,7 @@ public class DatastoreContext implements ClientActorConfig {
     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
+    private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
@@ -132,6 +134,7 @@ public class DatastoreContext implements ClientActorConfig {
         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
         this.requestTimeout = other.requestTimeout;
         this.noProgressTimeout = other.noProgressTimeout;
+        this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -337,6 +340,10 @@ public class DatastoreContext implements ClientActorConfig {
         return noProgressTimeout;
     }
 
+    public int getInitialPayloadSerializedBufferCapacity() {
+        return initialPayloadSerializedBufferCapacity;
+    }
+
     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
@@ -613,6 +620,11 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
+            datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
+            return this;
+        }
+
         @Override
         public DatastoreContext build() {
             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()
index 334bd8e..51ee4d7 100644 (file)
@@ -363,7 +363,8 @@ public class Shard extends RaftActor {
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else if (message instanceof PersistAbortTransactionPayload) {
                 final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
-                persistPayload(txId, AbortTransactionPayload.create(txId), true);
+                persistPayload(txId, AbortTransactionPayload.create(
+                        txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
             } else if (message instanceof MakeLeaderLocal) {
                 onMakeLeaderLocal();
             } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
index 92263f2..8b82601 100644 (file)
@@ -537,7 +537,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(
+                    historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
         } else if (callback != null) {
             callback.run();
         }
@@ -597,7 +598,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         chain.close();
-        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+        replicatePayload(id, CloseLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     /**
@@ -616,7 +618,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+        replicatePayload(id, PurgeLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     Optional<DataTreeCandidate> readCurrentData() {
@@ -640,7 +643,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         final TransactionIdentifier id = transaction.getIdentifier();
         LOG.debug("{}: aborting transaction {}", logContext, id);
-        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+        replicatePayload(id, AbortTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     @Override
@@ -660,7 +664,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
         LOG.debug("{}: purging transaction {}", logContext, id);
-        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+        replicatePayload(id, PurgeTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -999,7 +1004,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
-            payload = CommitTransactionPayload.create(txId, candidate);
+            payload = CommitTransactionPayload.create(txId, candidate,
+                    shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);
index ad9bc4a..0e34756 100644 (file)
@@ -54,8 +54,9 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
         super(transactionId, serialized);
     }
 
-    public static AbortTransactionPayload create(final TransactionIdentifier transactionId) {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    public static AbortTransactionPayload create(final TransactionIdentifier transactionId,
+            final int initialSerializedBufferCapacity) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
         try {
             transactionId.writeTo(out);
         } catch (IOException e) {
index ec78d60..9acc113 100644 (file)
@@ -54,8 +54,9 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
         super(historyId, serialized);
     }
 
-    public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId,
+            final int initialSerializedBufferCapacity) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
         try {
             historyId.writeTo(out);
         } catch (IOException e) {
index 213d61b..704866d 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
@@ -22,6 +23,8 @@ import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Payload persisted when a transaction commits. It contains the transaction identifier and the
@@ -31,6 +34,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
  */
 @Beta
 public final class CommitTransactionPayload extends Payload implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
+
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
         private byte[] serialized;
@@ -73,11 +78,22 @@ public final class CommitTransactionPayload extends Payload implements Serializa
     }
 
     public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
-            final DataTreeCandidate candidate) throws IOException {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+            final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
         transactionId.writeTo(out);
         DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate);
-        return new CommitTransactionPayload(out.toByteArray());
+        final byte[] serialized = out.toByteArray();
+
+        LOG.info("Initial buffer capacity {}, actual serialized size {}",
+                initialSerializedBufferCapacity, serialized.length);
+
+        return new CommitTransactionPayload(serialized);
+    }
+
+    @VisibleForTesting
+    public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
+            final DataTreeCandidate candidate) throws IOException {
+        return create(transactionId, candidate, 512);
     }
 
     public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate() throws IOException {
index b7f90e3..dbf72f3 100644 (file)
@@ -54,8 +54,9 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
         super(historyId, serialized);
     }
 
-    public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId,
+            final int initialSerializedBufferCapacity) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
         try {
             historyId.writeTo(out);
         } catch (IOException e) {
index c555f3c..8d9a8d2 100644 (file)
@@ -55,8 +55,9 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
         super(historyId, serialized);
     }
 
-    public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId,
+            final int initialSerializedBufferCapacity) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
         try {
             historyId.writeTo(out);
         } catch (IOException e) {
index 3141d69..ac84972 100644 (file)
@@ -54,8 +54,9 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
         super(transactionId, serialized);
     }
 
-    public static PurgeTransactionPayload create(final TransactionIdentifier transactionId) {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    public static PurgeTransactionPayload create(final TransactionIdentifier transactionId,
+            final int initialSerializedBufferCapacity) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
         try {
             transactionId.writeTo(out);
         } catch (IOException e) {
index a49ca8a..af37589 100644 (file)
@@ -261,6 +261,12 @@ module distributed-datastore-provider {
             description "The timeout interval whereby the client front-end hasn't made progress with the
                          back-end on any request and terminates.";
         }
+
+        leaf initial-payload-serialized-buffer-capacity {
+            default 512;
+            type non-zero-uint32-type;
+            description "The initial buffer capacity, in bytes, to use when serializing message payloads.";
+        }
     }
 
     container data-store-properties-container {
index 95ec760..14866d4 100644 (file)
@@ -96,6 +96,7 @@ public class DatastoreContextIntrospectorTest {
         properties.put(" max shard data change listener queue size", "2222");
         properties.put("mAx-shaRd-data-STORE-executor-quEUe-size", "3333");
         properties.put("persistent", "false");
+        properties.put("initial-payload-serialized-buffer-capacity", "600");
 
         boolean updated = introspector.update(properties);
         assertTrue("updated", updated);
@@ -119,6 +120,7 @@ public class DatastoreContextIntrospectorTest {
         assertEquals(1111, context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
         assertEquals(2222, context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
         assertEquals(3333, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+        assertEquals(600, context.getInitialPayloadSerializedBufferCapacity());
         assertFalse(context.isPersistent());
 
         properties.put("shard-transaction-idle-timeout-in-minutes", "32");
index c077622..6bd8f36 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
@@ -104,6 +105,7 @@ public class DatastoreContextTest {
         builder.maxShardDataStoreExecutorQueueSize(
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
         builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1);
+        builder.initialPayloadSerializedBufferCapacity(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1);
 
         DatastoreContext context = builder.build();
 
@@ -155,5 +157,7 @@ public class DatastoreContextTest {
         assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1,
                 context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
         assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize());
+        assertEquals(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1,
+                context.getInitialPayloadSerializedBufferCapacity());
     }
 }
index df4c38a..814f907 100644 (file)
@@ -12,9 +12,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -67,6 +67,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ShardDataTreeTest extends AbstractTest {
+    private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
 
     private final Shard mockShard = Mockito.mock(Shard.class);
     private ShardDataTree shardDataTree;
@@ -75,7 +76,8 @@ public class ShardDataTreeTest extends AbstractTest {
     @Before
     public void setUp() {
         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
-        doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
+        doReturn(mock(ShardStats.class)).when(mockShard).getShardMBean();
+        doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
 
         fullSchema = SchemaContextHelper.full();
 
index c6785c0..eeed061 100644 (file)
@@ -11,6 +11,6 @@ public class CloseLocalHistoryPayloadTest extends AbstractIdentifiablePayloadTes
 
     @Override
     CloseLocalHistoryPayload object() {
-        return CloseLocalHistoryPayload.create(nextHistoryId());
+        return CloseLocalHistoryPayload.create(nextHistoryId(), 512);
     }
-}
\ No newline at end of file
+}
index a7d7e93..e0aef36 100644 (file)
@@ -11,6 +11,6 @@ public class CreateLocalHistoryPayloadTest extends AbstractIdentifiablePayloadTe
 
     @Override
     CreateLocalHistoryPayload object() {
-        return CreateLocalHistoryPayload.create(nextHistoryId());
+        return CreateLocalHistoryPayload.create(nextHistoryId(), 512);
     }
-}
\ No newline at end of file
+}
index 8e5423a..3a3ded1 100644 (file)
@@ -11,6 +11,6 @@ public class PurgeLocalHistoryPayloadTest extends AbstractIdentifiablePayloadTes
 
     @Override
     PurgeLocalHistoryPayload object() {
-        return PurgeLocalHistoryPayload.create(nextHistoryId());
+        return PurgeLocalHistoryPayload.create(nextHistoryId(), 512);
     }
-}
\ No newline at end of file
+}
index ff1d30f..cf59654 100644 (file)
@@ -11,6 +11,6 @@ public class PurgeTransactionPayloadTest extends AbstractIdentifiablePayloadTest
 
     @Override
     PurgeTransactionPayload object() {
-        return PurgeTransactionPayload.create(nextTransactionId());
+        return PurgeTransactionPayload.create(nextTransactionId(), 512);
     }
-}
\ No newline at end of file
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.