*/
package org.opendaylight.controller.cluster.raft;
+import static java.util.Objects.requireNonNull;
+
import akka.japi.Procedure;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
/**
class RaftActorDelegatingPersistentDataProvider extends DelegatingPersistentDataProvider {
private final PersistentDataProvider persistentProvider;
- RaftActorDelegatingPersistentDataProvider(DataPersistenceProvider delegate,
- PersistentDataProvider persistentProvider) {
+ RaftActorDelegatingPersistentDataProvider(final DataPersistenceProvider delegate,
+ final PersistentDataProvider persistentProvider) {
super(delegate);
- this.persistentProvider = Preconditions.checkNotNull(persistentProvider);
+ this.persistentProvider = requireNonNull(persistentProvider);
}
@Override
- public <T> void persist(T o, Procedure<T> procedure) {
- if(getDelegate().isRecoveryApplicable()) {
- super.persist(o, procedure);
- } else {
- boolean isPersistentPayload = false;
- if(o instanceof ReplicatedLogEntry) {
- isPersistentPayload = ((ReplicatedLogEntry)o).getData() instanceof PersistentPayload;
- }
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
+ doPersist(entry, procedure, false);
+ }
+
+ @Override
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
+ doPersist(entry, procedure, true);
+ }
- if(isPersistentPayload) {
- persistentProvider.persist(o, procedure);
+ private <T> void doPersist(final T entry, final Procedure<T> procedure, final boolean async) {
+ if (getDelegate().isRecoveryApplicable()) {
+ persistSuper(entry, procedure, async);
+ } else {
+ if (entry instanceof ReplicatedLogEntry) {
+ Payload payload = ((ReplicatedLogEntry)entry).getData();
+ if (payload instanceof PersistentPayload) {
+ // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
+ // on recovery if data persistence is later enabled.
+ if (async) {
+ persistentProvider.persistAsync(payload, p -> procedure.apply(entry));
+ } else {
+ persistentProvider.persist(payload, p -> procedure.apply(entry));
+ }
+ } else {
+ persistSuper(entry, procedure, async);
+ }
} else {
- super.persist(o, procedure);
+ persistSuper(entry, procedure, async);
}
}
}
+
+ private <T> void persistSuper(final T object, final Procedure<T> procedure, final boolean async) {
+ if (async) {
+ super.persistAsync(object, procedure);
+ } else {
+ super.persist(object, procedure);
+ }
+ }
}