Merge "BUG 2464 : Shard dataSize does not seem to correspond to actual memory usage"
authorTom Pantelis <tpanteli@brocade.com>
Wed, 17 Dec 2014 23:55:44 +0000 (23:55 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 17 Dec 2014 23:55:45 +0000 (23:55 +0000)
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java
new file mode 100644 (file)
index 0000000..99de5dd
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeModificationByteStringPayload extends Payload implements
+        Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private ByteString byteString;
+    private SoftReference<PersistentMessages.CompositeModification> modificationReference;
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class);
+
+    public CompositeModificationByteStringPayload(){
+        byteString = null;
+    }
+    public CompositeModificationByteStringPayload(Object modification){
+        this(((PersistentMessages.CompositeModification) modification).toByteString());
+        this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification);
+    }
+
+    private CompositeModificationByteStringPayload(ByteString byteString){
+        this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null");
+    }
+
+
+    @Override
+    public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+        Preconditions.checkState(byteString!=null);
+        Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+        map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
+                getModificationInternal());
+        return map;
+    }
+
+    @Override
+    public Payload decode(
+            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+        PersistentMessages.CompositeModification modification = payload
+                .getExtension(
+                        org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification);
+
+        // The extension was put in the unknown field.
+        // This is because extensions need to be registered
+        // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+        // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+        // If that is not done then on the other end the extension shows up as an unknown field
+        // Need to figure out a better way to do this
+        if(payload.getUnknownFields().hasField(2)){
+            UnknownFieldSet.Field field =
+                    payload.getUnknownFields().getField(2);
+
+            return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0));
+        }
+
+        return new CompositeModificationByteStringPayload(modification);
+    }
+
+    public Object getModification(){
+        return getModificationInternal();
+    }
+
+    private PersistentMessages.CompositeModification getModificationInternal(){
+        if(this.modificationReference != null && this.modificationReference.get() != null){
+            return this.modificationReference.get();
+        }
+        try {
+            PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString);
+            this.modificationReference = new SoftReference<>(compositeModification);
+            return compositeModification;
+        } catch (InvalidProtocolBufferException e) {
+            LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e);
+        }
+
+        return null;
+    }
+
+    public int size(){
+        return byteString.size();
+    }
+
+    private void writeObject(java.io.ObjectOutputStream stream)
+            throws IOException {
+        byteString.writeTo(stream);
+    }
+
+    private void readObject(java.io.ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        byteString = ByteString.readFrom(stream);
+    }
+
+    @VisibleForTesting
+    public void clearModificationReference(){
+        if(this.modificationReference != null) {
+            this.modificationReference.clear();
+        }
+    }
+}
\ No newline at end of file
index a22e535fad1f6fa7052121c6be736a299e0b48cb..7d6dde9c8af296df1b82f331f431daaa0d431832 100644 (file)
@@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -321,7 +322,7 @@ public class Shard extends RaftActor {
             cohortEntry.getCohort().preCommit().get();
 
             Shard.this.persistData(getSender(), transactionID,
-                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+                    new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -679,6 +680,8 @@ public class Shard extends RaftActor {
     protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else if (data instanceof CompositeModificationByteStringPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
             LOG.error("Unknown state received {} during recovery", data);
         }
@@ -755,19 +758,12 @@ public class Shard extends RaftActor {
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if(modification == null) {
-                LOG.error(
-                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                     identifier, clientActor != null ? clientActor.path().toString() : null);
-            } else if(clientActor == null) {
-                // There's no clientActor to which to send a commit reply so we must be applying
-                // replicated state from the leader.
-                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                        modification, schemaContext));
-            } else {
-                // This must be the OK to commit after replication consensus.
-                finishCommit(clientActor, identifier);
-            }
+            applyModificationToState(clientActor, identifier, modification);
+        } else if(data instanceof CompositeModificationByteStringPayload ){
+            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+            applyModificationToState(clientActor, identifier, modification);
+
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -778,6 +774,22 @@ public class Shard extends RaftActor {
 
     }
 
+    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+        if(modification == null) {
+            LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
+        } else if(clientActor == null) {
+            // There's no clientActor to which to send a commit reply so we must be applying
+            // replicated state from the leader.
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+                    modification, schemaContext));
+        } else {
+            // This must be the OK to commit after replication consensus.
+            finishCommit(clientActor, identifier);
+        }
+    }
+
     private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
new file mode 100644 (file)
index 0000000..db9f3d1
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class CompositeModificationByteStringPayloadTest {
+
+    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+    @Test
+    public void testSerialization(){
+        WriteModification writeModification =
+                new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext());
+
+        MutableCompositeModification compositeModification =
+                new MutableCompositeModification();
+
+        compositeModification.addModification(writeModification);
+
+        CompositeModificationByteStringPayload compositeModificationByteStringPayload
+                = new CompositeModificationByteStringPayload(compositeModification.toSerializable());
+
+        byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload);
+
+        Object deserialize = SerializationUtils.deserialize(bytes);
+
+        assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
+
+    }
+
+    @Test
+    public void testAppendEntries(){
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+        CompositeModificationByteStringPayload payload = newByteStringPayload(
+                new WriteModification(TestModel.OUTER_LIST_PATH,
+                        ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                        SCHEMA_CONTEXT));
+
+        payload.clearModificationReference();
+
+        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+
+
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+    }
+
+
+
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+}
index 926cef6ba53ea2805f4310e2d9a55b59371f4094..2792342ab2f3921f451999651477edca5664397b 100644 (file)
@@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -432,9 +433,9 @@ public class ShardTest extends AbstractActorTest {
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                           SCHEMA_CONTEXT))));
 
-        int nListEntries = 11;
+        int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        for(int i = 1; i <= nListEntries; i++) {
+        for(int i = 1; i <= nListEntries-5; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
@@ -445,6 +446,19 @@ public class ShardTest extends AbstractActorTest {
                     newPayload(mod)));
         }
 
+        // Add some of the new CompositeModificationByteStringPayload
+        for(int i = 11; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newByteStringPayload(mod)));
+        }
+
+
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
                 new ApplyLogEntries(nListEntries));
 
@@ -516,6 +530,16 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+
     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification) {