X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorDelegatingPersistentDataProvider.java;h=0bd86382607f61b827e14a25dc8f86e15f11b70d;hp=466609d8938c7c9674af0d3954fb6b9e7fa6c7f3;hb=73ab61a037dd2489600acbc1eaf6f9ee549c204a;hpb=93ccb43ad0f5c78337f19884a51e2bd479cc46fd diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java index 466609d893..0bd8638260 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProvider.java @@ -7,11 +7,13 @@ */ package org.opendaylight.controller.cluster.raft; +import static java.util.Objects.requireNonNull; + import akka.japi.Procedure; -import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload; /** @@ -23,27 +25,50 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Persis class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentDataProvider { private final PersistentDataProvider persistentProvider; - RaftActorDelegatingPersistentDataProvider(DataPersistenceProvider delegate, - PersistentDataProvider persistentProvider) { + RaftActorDelegatingPersistentDataProvider(final DataPersistenceProvider delegate, + final PersistentDataProvider persistentProvider) { super(delegate); - this.persistentProvider = Preconditions.checkNotNull(persistentProvider); + this.persistentProvider = requireNonNull(persistentProvider); } @Override - public void persist(T o, Procedure procedure) { - if(getDelegate().isRecoveryApplicable()) { - super.persist(o, procedure); - } else { - boolean isPersistentPayload = false; - if(o instanceof ReplicatedLogEntry) { - isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload; - } + public void persist(final T entry, final Procedure procedure) { + doPersist(entry, procedure, false); + } + + @Override + public void persistAsync(final T entry, final Procedure procedure) { + doPersist(entry, procedure, true); + } - if(isPersistentPayload) { - persistentProvider.persist(o, procedure); + private void doPersist(final T entry, final Procedure procedure, final boolean async) { + if (getDelegate().isRecoveryApplicable()) { + persistSuper(entry, procedure, async); + } else { + if (entry instanceof ReplicatedLogEntry) { + Payload payload = ((ReplicatedLogEntry)entry).getData(); + if (payload instanceof PersistentPayload) { + // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes + // on recovery if data persistence is later enabled. + if (async) { + persistentProvider.persistAsync(payload, p -> procedure.apply(entry)); + } else { + persistentProvider.persist(payload, p -> procedure.apply(entry)); + } + } else { + persistSuper(entry, procedure, async); + } } else { - super.persist(o, procedure); + persistSuper(entry, procedure, async); } } } + + private void persistSuper(final T object, final Procedure procedure, final boolean async) { + if (async) { + super.persistAsync(object, procedure); + } else { + super.persist(object, procedure); + } + } }