Fixup comparison formatting
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorDelegatingPersistentDataProvider.java
index 466609d8938c7c9674af0d3954fb6b9e7fa6c7f3..0bd86382607f61b827e14a25dc8f86e15f11b70d 100644 (file)
@@ -7,11 +7,13 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.japi.Procedure;
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 
 /**
@@ -23,27 +25,50 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Persis
 class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentDataProvider {
     private final PersistentDataProvider persistentProvider;
 
-    RaftActorDelegatingPersistentDataProvider(DataPersistenceProvider delegate,
-            PersistentDataProvider persistentProvider) {
+    RaftActorDelegatingPersistentDataProvider(final DataPersistenceProvider delegate,
+            final PersistentDataProvider persistentProvider) {
         super(delegate);
-        this.persistentProvider = Preconditions.checkNotNull(persistentProvider);
+        this.persistentProvider = requireNonNull(persistentProvider);
     }
 
     @Override
-    public <T> void persist(T o, Procedure<T> procedure) {
-        if(getDelegate().isRecoveryApplicable()) {
-            super.persist(o, procedure);
-        } else {
-            boolean isPersistentPayload = false;
-            if(o instanceof ReplicatedLogEntry) {
-                isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
-            }
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
+        doPersist(entry, procedure, false);
+    }
+
+    @Override
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+        doPersist(entry, procedure, true);
+    }
 
-            if(isPersistentPayload) {
-                persistentProvider.persist(o, procedure);
+    private <T> void doPersist(final T entry, final Procedure<T> procedure, final boolean async) {
+        if (getDelegate().isRecoveryApplicable()) {
+            persistSuper(entry, procedure, async);
+        } else {
+            if (entry instanceof ReplicatedLogEntry) {
+                Payload payload = ((ReplicatedLogEntry)entry).getData();
+                if (payload instanceof PersistentPayload) {
+                    // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
+                    // on recovery if data persistence is later enabled.
+                    if (async) {
+                        persistentProvider.persistAsync(payload, p -> procedure.apply(entry));
+                    } else {
+                        persistentProvider.persist(payload, p -> procedure.apply(entry));
+                    }
+                } else {
+                    persistSuper(entry, procedure, async);
+                }
             } else {
-                super.persist(o, procedure);
+                persistSuper(entry, procedure, async);
             }
         }
     }
+
+    private <T> void persistSuper(final T object, final Procedure<T> procedure, final boolean async) {
+        if (async) {
+            super.persistAsync(object, procedure);
+        } else {
+            super.persist(object, procedure);
+        }
+    }
 }