From deb9baa6423b052d51bb4ea354b8582aac76e41e Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Wed, 3 Dec 2014 09:58:35 -0800 Subject: [PATCH] BUG 2464 : Shard dataSize does not seem to correspond to actual memory usage The dataSize that is reported is the "serialized" size of the payload. Since the replicated log actually contains the CompositeModification object (which may hold on to a lot more memory) the serialized size is not neccessarily the same as the object size. To make the data size correspond to memory usage and to actually reduce memory usage this patch creates a new payload class called CompositeModificationByteStringPayload which only stores the ByteString which is an order of magnitude smaller. Custom serialization ensures that this object is written and read correctly. This patch is backward compatible in that a replicated log containing a CompositeModificationPayload will be read correctly but is not forward compatible in that if a new controller instance were to send a CompositeModificationPayload to an older instance it would not work. To ensure that we do not need to immediately require a conversion from ByteString to PersistentMessages.CompositeModification we maintain a SoftReference to CompositeModification. Change-Id: I32c921dea2d39ed689aa2fb6f68eb8528be920d0 Signed-off-by: Moiz Raja --- ...ompositeModificationByteStringPayload.java | 119 ++++++++++++++++++ .../controller/cluster/datastore/Shard.java | 40 +++--- ...siteModificationByteStringPayloadTest.java | 83 ++++++++++++ .../cluster/datastore/ShardTest.java | 28 ++++- 4 files changed, 254 insertions(+), 16 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java new file mode 100644 index 0000000000..99de5dde35 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.protobuff.client.messages; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.UnknownFieldSet; +import java.io.IOException; +import java.io.Serializable; +import java.lang.ref.SoftReference; +import java.util.HashMap; +import java.util.Map; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompositeModificationByteStringPayload extends Payload implements + Serializable { + private static final long serialVersionUID = 1L; + + private ByteString byteString; + private SoftReference modificationReference; + private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class); + + public CompositeModificationByteStringPayload(){ + byteString = null; + } + public CompositeModificationByteStringPayload(Object modification){ + this(((PersistentMessages.CompositeModification) modification).toByteString()); + this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification); + } + + private CompositeModificationByteStringPayload(ByteString byteString){ + this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null"); + } + + + @Override + public Map encode() { + Preconditions.checkState(byteString!=null); + Map map = new HashMap<>(); + map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification, + getModificationInternal()); + return map; + } + + @Override + public Payload decode( + AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) { + PersistentMessages.CompositeModification modification = payload + .getExtension( + org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification); + + // The extension was put in the unknown field. + // This is because extensions need to be registered + // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions + // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry + // If that is not done then on the other end the extension shows up as an unknown field + // Need to figure out a better way to do this + if(payload.getUnknownFields().hasField(2)){ + UnknownFieldSet.Field field = + payload.getUnknownFields().getField(2); + + return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0)); + } + + return new CompositeModificationByteStringPayload(modification); + } + + public Object getModification(){ + return getModificationInternal(); + } + + private PersistentMessages.CompositeModification getModificationInternal(){ + if(this.modificationReference != null && this.modificationReference.get() != null){ + return this.modificationReference.get(); + } + try { + PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString); + this.modificationReference = new SoftReference<>(compositeModification); + return compositeModification; + } catch (InvalidProtocolBufferException e) { + LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e); + } + + return null; + } + + public int size(){ + return byteString.size(); + } + + private void writeObject(java.io.ObjectOutputStream stream) + throws IOException { + byteString.writeTo(stream); + } + + private void readObject(java.io.ObjectInputStream stream) + throws IOException, ClassNotFoundException { + byteString = ByteString.readFrom(stream); + } + + @VisibleForTesting + public void clearModificationReference(){ + if(this.modificationReference != null) { + this.modificationReference.clear(); + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a22e535fad..7d6dde9c8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -321,7 +322,7 @@ public class Shard extends RaftActor { cohortEntry.getCohort().preCommit().get(); Shard.this.persistData(getSender(), transactionID, - new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); + new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); @@ -679,6 +680,8 @@ public class Shard extends RaftActor { protected void appendRecoveredLogEntry(final Payload data) { if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); + } else if (data instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); } else { LOG.error("Unknown state received {} during recovery", data); } @@ -755,19 +758,12 @@ public class Shard extends RaftActor { if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); - if(modification == null) { - LOG.error( - "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor != null ? clientActor.path().toString() : null); - } else if(clientActor == null) { - // There's no clientActor to which to send a commit reply so we must be applying - // replicated state from the leader. - commitWithNewTransaction(MutableCompositeModification.fromSerializable( - modification, schemaContext)); - } else { - // This must be the OK to commit after replication consensus. - finishCommit(clientActor, identifier); - } + applyModificationToState(clientActor, identifier, modification); + } else if(data instanceof CompositeModificationByteStringPayload ){ + Object modification = ((CompositeModificationByteStringPayload) data).getModification(); + + applyModificationToState(clientActor, identifier, modification); + } else { LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), @@ -778,6 +774,22 @@ public class Shard extends RaftActor { } + private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { + if(modification == null) { + LOG.error( + "modification is null - this is very unexpected, clientActor = {}, identifier = {}", + identifier, clientActor != null ? clientActor.path().toString() : null); + } else if(clientActor == null) { + // There's no clientActor to which to send a commit reply so we must be applying + // replicated state from the leader. + commitWithNewTransaction(MutableCompositeModification.fromSerializable( + modification, schemaContext)); + } else { + // This must be the OK to commit after replication consensus. + finishCommit(clientActor, identifier); + } + } + private void updateJournalStats() { ReplicatedLogEntry lastLogEntry = getLastLogEntry(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java new file mode 100644 index 0000000000..db9f3d1801 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class CompositeModificationByteStringPayloadTest { + + private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext(); + + @Test + public void testSerialization(){ + WriteModification writeModification = + new WriteModification(TestModel.TEST_PATH, ImmutableNodes + .containerNode(TestModel.TEST_QNAME), + TestModel.createTestContext()); + + MutableCompositeModification compositeModification = + new MutableCompositeModification(); + + compositeModification.addModification(writeModification); + + CompositeModificationByteStringPayload compositeModificationByteStringPayload + = new CompositeModificationByteStringPayload(compositeModification.toSerializable()); + + byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload); + + Object deserialize = SerializationUtils.deserialize(bytes); + + assertTrue(deserialize instanceof CompositeModificationByteStringPayload); + + } + + @Test + public void testAppendEntries(){ + List entries = new ArrayList<>(); + + CompositeModificationByteStringPayload payload = newByteStringPayload( + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + SCHEMA_CONTEXT)); + + payload.clearModificationReference(); + + entries.add(new ReplicatedLogImplEntry(0, 1, payload)); + + + assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable()); + } + + + + private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(); + for(Modification mod: mods) { + compMod.addModification(mod); + } + + return new CompositeModificationByteStringPayload(compMod.toSerializable()); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 926cef6ba5..2792342ab2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -432,9 +433,9 @@ public class ShardTest extends AbstractActorTest { ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), SCHEMA_CONTEXT)))); - int nListEntries = 11; + int nListEntries = 16; Set listEntryKeys = new HashSet<>(); - for(int i = 1; i <= nListEntries; i++) { + for(int i = 1; i <= nListEntries-5; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); @@ -445,6 +446,19 @@ public class ShardTest extends AbstractActorTest { newPayload(mod))); } + // Add some of the new CompositeModificationByteStringPayload + for(int i = 11; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + Modification mod = new MergeModification(path, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i), + SCHEMA_CONTEXT); + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + newByteStringPayload(mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); @@ -516,6 +530,16 @@ public class ShardTest extends AbstractActorTest { return new CompositeModificationPayload(compMod.toSerializable()); } + private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(); + for(Modification mod: mods) { + compMod.addModification(mod); + } + + return new CompositeModificationByteStringPayload(compMod.toSerializable()); + } + + private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification) { -- 2.36.6