BUG 2464 : Shard dataSize does not seem to correspond to actual memory usage 49/13349/7
authorMoiz Raja <moraja@cisco.com>
Wed, 3 Dec 2014 17:58:35 +0000 (09:58 -0800)
committerMoiz Raja <moraja@cisco.com>
Tue, 16 Dec 2014 21:37:43 +0000 (21:37 +0000)
The dataSize that is reported is the "serialized" size of the payload. Since the replicated
log actually contains the CompositeModification object (which may hold on to a lot more memory)
the serialized size is not neccessarily the same as the object size.

To make the data size correspond to memory usage and to actually reduce memory usage this patch
creates a new payload class called CompositeModificationByteStringPayload which only stores the
ByteString which is an order of magnitude smaller. Custom serialization ensures that this object
is written and read correctly.

This patch is backward compatible in that a replicated log containing a CompositeModificationPayload
will be read correctly but is not forward compatible in that if a new controller instance were to send
a CompositeModificationPayload to an older instance it would not work.

To ensure that we do not need to immediately require a conversion from ByteString to PersistentMessages.CompositeModification we maintain a SoftReference to CompositeModification.

Change-Id: I32c921dea2d39ed689aa2fb6f68eb8528be920d0
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java
new file mode 100644 (file)
index 0000000..99de5dd
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeModificationByteStringPayload extends Payload implements
+        Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private ByteString byteString;
+    private SoftReference<PersistentMessages.CompositeModification> modificationReference;
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class);
+
+    public CompositeModificationByteStringPayload(){
+        byteString = null;
+    }
+    public CompositeModificationByteStringPayload(Object modification){
+        this(((PersistentMessages.CompositeModification) modification).toByteString());
+        this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification);
+    }
+
+    private CompositeModificationByteStringPayload(ByteString byteString){
+        this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null");
+    }
+
+
+    @Override
+    public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+        Preconditions.checkState(byteString!=null);
+        Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+        map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
+                getModificationInternal());
+        return map;
+    }
+
+    @Override
+    public Payload decode(
+            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+        PersistentMessages.CompositeModification modification = payload
+                .getExtension(
+                        org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification);
+
+        // The extension was put in the unknown field.
+        // This is because extensions need to be registered
+        // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+        // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+        // If that is not done then on the other end the extension shows up as an unknown field
+        // Need to figure out a better way to do this
+        if(payload.getUnknownFields().hasField(2)){
+            UnknownFieldSet.Field field =
+                    payload.getUnknownFields().getField(2);
+
+            return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0));
+        }
+
+        return new CompositeModificationByteStringPayload(modification);
+    }
+
+    public Object getModification(){
+        return getModificationInternal();
+    }
+
+    private PersistentMessages.CompositeModification getModificationInternal(){
+        if(this.modificationReference != null && this.modificationReference.get() != null){
+            return this.modificationReference.get();
+        }
+        try {
+            PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString);
+            this.modificationReference = new SoftReference<>(compositeModification);
+            return compositeModification;
+        } catch (InvalidProtocolBufferException e) {
+            LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e);
+        }
+
+        return null;
+    }
+
+    public int size(){
+        return byteString.size();
+    }
+
+    private void writeObject(java.io.ObjectOutputStream stream)
+            throws IOException {
+        byteString.writeTo(stream);
+    }
+
+    private void readObject(java.io.ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        byteString = ByteString.readFrom(stream);
+    }
+
+    @VisibleForTesting
+    public void clearModificationReference(){
+        if(this.modificationReference != null) {
+            this.modificationReference.clear();
+        }
+    }
+}
\ No newline at end of file
index a22e535..7d6dde9 100644 (file)
@@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -321,7 +322,7 @@ public class Shard extends RaftActor {
             cohortEntry.getCohort().preCommit().get();
 
             Shard.this.persistData(getSender(), transactionID,
-                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+                    new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -679,6 +680,8 @@ public class Shard extends RaftActor {
     protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else if (data instanceof CompositeModificationByteStringPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
             LOG.error("Unknown state received {} during recovery", data);
         }
@@ -755,19 +758,12 @@ public class Shard extends RaftActor {
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if(modification == null) {
-                LOG.error(
-                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                     identifier, clientActor != null ? clientActor.path().toString() : null);
-            } else if(clientActor == null) {
-                // There's no clientActor to which to send a commit reply so we must be applying
-                // replicated state from the leader.
-                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                        modification, schemaContext));
-            } else {
-                // This must be the OK to commit after replication consensus.
-                finishCommit(clientActor, identifier);
-            }
+            applyModificationToState(clientActor, identifier, modification);
+        } else if(data instanceof CompositeModificationByteStringPayload ){
+            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+            applyModificationToState(clientActor, identifier, modification);
+
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -778,6 +774,22 @@ public class Shard extends RaftActor {
 
     }
 
+    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+        if(modification == null) {
+            LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
+        } else if(clientActor == null) {
+            // There's no clientActor to which to send a commit reply so we must be applying
+            // replicated state from the leader.
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+                    modification, schemaContext));
+        } else {
+            // This must be the OK to commit after replication consensus.
+            finishCommit(clientActor, identifier);
+        }
+    }
+
     private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
new file mode 100644 (file)
index 0000000..db9f3d1
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class CompositeModificationByteStringPayloadTest {
+
+    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+    @Test
+    public void testSerialization(){
+        WriteModification writeModification =
+                new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext());
+
+        MutableCompositeModification compositeModification =
+                new MutableCompositeModification();
+
+        compositeModification.addModification(writeModification);
+
+        CompositeModificationByteStringPayload compositeModificationByteStringPayload
+                = new CompositeModificationByteStringPayload(compositeModification.toSerializable());
+
+        byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload);
+
+        Object deserialize = SerializationUtils.deserialize(bytes);
+
+        assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
+
+    }
+
+    @Test
+    public void testAppendEntries(){
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+        CompositeModificationByteStringPayload payload = newByteStringPayload(
+                new WriteModification(TestModel.OUTER_LIST_PATH,
+                        ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                        SCHEMA_CONTEXT));
+
+        payload.clearModificationReference();
+
+        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+
+
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+    }
+
+
+
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+}
index 926cef6..2792342 100644 (file)
@@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -432,9 +433,9 @@ public class ShardTest extends AbstractActorTest {
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                           SCHEMA_CONTEXT))));
 
-        int nListEntries = 11;
+        int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        for(int i = 1; i <= nListEntries; i++) {
+        for(int i = 1; i <= nListEntries-5; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
@@ -445,6 +446,19 @@ public class ShardTest extends AbstractActorTest {
                     newPayload(mod)));
         }
 
+        // Add some of the new CompositeModificationByteStringPayload
+        for(int i = 11; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newByteStringPayload(mod)));
+        }
+
+
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
                 new ApplyLogEntries(nListEntries));
 
@@ -516,6 +530,16 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+
     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification) {