BUG 2464 : Shard dataSize does not seem to correspond to actual memory usage
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index a22e535fad1f6fa7052121c6be736a299e0b48cb..7d6dde9c8af296df1b82f331f431daaa0d431832 100644 (file)
@@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -321,7 +322,7 @@ public class Shard extends RaftActor {
             cohortEntry.getCohort().preCommit().get();
 
             Shard.this.persistData(getSender(), transactionID,
-                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+                    new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -679,6 +680,8 @@ public class Shard extends RaftActor {
     protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else if (data instanceof CompositeModificationByteStringPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
             LOG.error("Unknown state received {} during recovery", data);
         }
@@ -755,19 +758,12 @@ public class Shard extends RaftActor {
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if(modification == null) {
-                LOG.error(
-                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                     identifier, clientActor != null ? clientActor.path().toString() : null);
-            } else if(clientActor == null) {
-                // There's no clientActor to which to send a commit reply so we must be applying
-                // replicated state from the leader.
-                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                        modification, schemaContext));
-            } else {
-                // This must be the OK to commit after replication consensus.
-                finishCommit(clientActor, identifier);
-            }
+            applyModificationToState(clientActor, identifier, modification);
+        } else if(data instanceof CompositeModificationByteStringPayload ){
+            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+            applyModificationToState(clientActor, identifier, modification);
+
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -778,6 +774,22 @@ public class Shard extends RaftActor {
 
     }
 
+    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+        if(modification == null) {
+            LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
+        } else if(clientActor == null) {
+            // There's no clientActor to which to send a commit reply so we must be applying
+            // replicated state from the leader.
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+                    modification, schemaContext));
+        } else {
+            // This must be the OK to commit after replication consensus.
+            finishCommit(clientActor, identifier);
+        }
+    }
+
     private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();