Refactor ReplicatedLogImpl to separate class 11/17211/6
authorTom Pantelis <tpanteli@brocade.com>
Thu, 26 Mar 2015 05:25:11 +0000 (01:25 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 30 Mar 2015 04:10:44 +0000 (00:10 -0400)
To reduce the size of the RaftActor class for improved readability.

Change-Id: I845cfeb4bc48f0c5eb96ba2cc0a925d9ce5fa416
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java

index a13b6ff..1c30fe2 100644 (file)
@@ -43,7 +43,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -107,7 +107,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
      */
-    private RaftActorBehavior currentBehavior;
+    private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
 
     /**
      * This context should NOT be passed directly to any other actor it is
@@ -119,11 +119,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
 
-    /**
-     * The in-memory journal
-     */
-    private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -139,9 +134,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
-            -1, -1, replicatedLog, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
-            LOG);
+            -1, -1, peerAddresses,
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG);
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
     }
 
     private void initRecoveryTimer() {
@@ -184,7 +180,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             } else if (message instanceof ApplyJournalEntries) {
                 onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
-                replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
                         ((UpdateElectionTerm) message).getVotedFor());
@@ -219,9 +215,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         // Create a replicated log with the snapshot information
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
-        replicatedLog = new ReplicatedLogImpl(snapshot);
 
-        context.setReplicatedLog(replicatedLog);
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+                currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
@@ -232,8 +228,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
+                replicatedLog().size(), persistenceId(), timer.toString(),
+                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -241,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
-        replicatedLog.append(logEntry);
+        replicatedLog().append(logEntry);
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
@@ -251,7 +247,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog.get(i));
+            batchRecoveredLogEntry(replicatedLog().get(i));
         }
 
         context.setLastApplied(toIndex);
@@ -297,8 +293,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm(), replicatedLog.size());
+            replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+            replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         initializeBehavior();
     }
@@ -308,9 +304,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        reusableBehaviorStateHolder.init(currentBehavior);
-        currentBehavior = newBehavior;
-        handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+        reusableBehaviorStateHolder.init(getCurrentBehavior());
+        setCurrentBehavior(newBehavior);
+        handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
     @Override public void handleCommand(Object message) {
@@ -353,8 +349,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            replicatedLog = new ReplicatedLogImpl(snapshot);
-            context.setReplicatedLog(replicatedLog);
+            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+                    currentBehavior));
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
@@ -391,11 +387,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message.equals(COMMIT_SNAPSHOT)) {
             commitSnapshot(-1);
         } else {
-            reusableBehaviorStateHolder.init(currentBehavior);
+            reusableBehaviorStateHolder.init(getCurrentBehavior());
 
-            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+            setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
 
-            handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+            handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
         }
     }
 
@@ -405,17 +401,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
-                .inMemoryJournalDataSize(replicatedLog.dataSize())
-                .inMemoryJournalLogSize(replicatedLog.size())
+                .inMemoryJournalDataSize(replicatedLog().dataSize())
+                .inMemoryJournalLogSize(replicatedLog().size())
                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
-                .lastIndex(replicatedLog.lastIndex())
-                .lastTerm(replicatedLog.lastTerm())
+                .lastIndex(replicatedLog().lastIndex())
+                .lastTerm(replicatedLog().lastTerm())
                 .leader(getLeaderId())
                 .raftState(currentBehavior.state().toString())
                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
-                .snapshotIndex(replicatedLog.getSnapshotIndex())
-                .snapshotTerm(replicatedLog.getSnapshotTerm())
+                .snapshotIndex(replicatedLog().getSnapshotIndex())
+                .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
                 .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
 
@@ -425,8 +421,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             builder.lastLogTerm(lastLogEntry.getTerm());
         }
 
-        if(currentBehavior instanceof AbstractLeader) {
-            AbstractLeader leader = (AbstractLeader)currentBehavior;
+        if(getCurrentBehavior() instanceof AbstractLeader) {
+            AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
             Collection<String> followerIds = leader.getFollowerIds();
             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
             for(String id: followerIds) {
@@ -490,39 +486,49 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog
-                .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
-                        if(!hasFollowers()){
-                            // Increment the Commit Index and the Last Applied values
-                            raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                            raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
+            @Override
+            public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+                if(!hasFollowers()){
+                    // Increment the Commit Index and the Last Applied values
+                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
+                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
 
-                            // Apply the state immediately
-                            applyState(clientActor, identifier, data);
+                    // Apply the state immediately
+                    applyState(clientActor, identifier, data);
 
-                            // Send a ApplyJournalEntries message so that we write the fact that we applied
-                            // the state to durable storage
-                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                    // Send a ApplyJournalEntries message so that we write the fact that we applied
+                    // the state to durable storage
+                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+                    context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
 
-                        } else if (clientActor != null) {
-                            // Send message for replication
-                            currentBehavior.handleMessage(getSelf(),
-                                    new Replicate(clientActor, identifier,
-                                            replicatedLogEntry)
-                            );
-                        }
+                } else if (clientActor != null) {
+                    // Send message for replication
+                    currentBehavior.handleMessage(getSelf(),
+                            new Replicate(clientActor, identifier, replicatedLogEntry));
+                }
+            }
+        });
+    }
 
-                    }
-                });    }
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
 
     protected String getId() {
         return context.getId();
     }
 
+    @VisibleForTesting
+    void setCurrentBehavior(RaftActorBehavior behavior) {
+        currentBehavior.setDelegate(behavior);
+    }
+
+    protected RaftActorBehavior getCurrentBehavior() {
+        return currentBehavior.getDelegate();
+    }
+
     /**
      * Derived actors can call the isLeader method to check if the current
      * RaftActor is the Leader or not
@@ -563,7 +569,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog.last();
+        return replicatedLog().last();
     }
 
     protected Long getCurrentTerm(){
@@ -750,125 +756,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
-    }
-
-    protected long getTotalMemory() {
-        return Runtime.getRuntime().totalMemory();
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
     }
 
     protected boolean hasFollowers(){
-        return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
-    }
-
-    private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-        private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0L;
-
-
-        public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries());
-        }
-
-        public ReplicatedLogImpl() {
-            super();
-        }
-
-        @Override public void removeFromAndPersist(long logEntryIndex) {
-            int adjustedIndex = adjustedIndex(logEntryIndex);
-
-            if (adjustedIndex < 0) {
-                return;
-            }
-
-            // FIXME: Maybe this should be done after the command is saved
-            journal.subList(adjustedIndex , journal.size()).clear();
-
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
-
-                @Override
-                public void apply(DeleteEntries param)
-                        throws Exception {
-                    //FIXME : Doing nothing for now
-                    dataSize = 0;
-                    for (ReplicatedLogEntry entry : journal) {
-                        dataSize += entry.size();
-                    }
-                }
-            });
-        }
-
-        @Override public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry) {
-            appendAndPersist(replicatedLogEntry, null);
-        }
-
-        public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
-            }
-
-            // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-            journal.add(replicatedLogEntry);
-
-            // When persisting events with persist it is guaranteed that the
-            // persistent actor will not receive further commands between the
-            // persist call and the execution(s) of the associated event
-            // handler. This also holds for multiple persist calls in context
-            // of a single command.
-            persistence().persist(replicatedLogEntry,
-                new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry evt) throws Exception {
-                        int logEntrySize = replicatedLogEntry.size();
-
-                        dataSize += logEntrySize;
-                        long dataSizeForCheck = dataSize;
-
-                        dataSizeSinceLastSnapshot += logEntrySize;
-
-                        if (!hasFollowers()) {
-                            // When we do not have followers we do not maintain an in-memory log
-                            // due to this the journalSize will never become anything close to the
-                            // snapshot batch count. In fact will mostly be 1.
-                            // Similarly since the journal's dataSize depends on the entries in the
-                            // journal the journal's dataSize will never reach a value close to the
-                            // memory threshold.
-                            // By maintaining the dataSize outside the journal we are tracking essentially
-                            // what we have written to the disk however since we no longer are in
-                            // need of doing a snapshot just for the sake of freeing up memory we adjust
-                            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
-                            // as if we were maintaining a real snapshot
-                            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
-                        }
-                        long journalSize = replicatedLogEntry.getIndex() + 1;
-                        long dataThreshold = getTotalMemory() *
-                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
-                                || dataSizeForCheck > dataThreshold)) {
-
-                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                                    currentBehavior.getReplicatedToAllIndex());
-
-                            if(started){
-                                dataSizeSinceLastSnapshot = 0;
-                            }
-
-                        }
-
-                        if (callback != null){
-                            callback.apply(replicatedLogEntry);
-                        }
-                    }
-                }
-            );
-        }
-
+        return getRaftActorContext().hasFollowers();
     }
 
     static class DeleteEntries implements Serializable {
@@ -911,15 +803,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    @VisibleForTesting
-    void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
-        currentBehavior = behavior;
-    }
-
-    protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior;
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
index 2e7eb5e..9f4b7cb 100644 (file)
@@ -12,6 +12,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import java.util.Map;
 import org.slf4j.Logger;
 
@@ -168,4 +170,11 @@ public interface RaftActorContext {
 
     SnapshotManager getSnapshotManager();
 
+    boolean hasFollowers();
+
+    long getTotalMemory();
+
+    @VisibleForTesting
+    void setTotalMemoryRetriever(Supplier<Long> retriever);
+
 }
index eb059d6..684845c 100644 (file)
@@ -14,6 +14,8 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import java.util.Map;
 import org.slf4j.Logger;
 
@@ -39,25 +41,22 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private ConfigParams configParams;
 
-    private boolean snapshotCaptureInitiated;
+    @VisibleForTesting
+    private Supplier<Long> totalMemoryRetriever;
 
     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
     // be passed to it in the constructor
     private SnapshotManager snapshotManager;
 
-    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
-        String id,
-        ElectionTerm termInformation, long commitIndex,
-        long lastApplied, ReplicatedLog replicatedLog,
-        Map<String, String> peerAddresses, ConfigParams configParams,
-        Logger logger) {
+    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+            ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
+            ConfigParams configParams, Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
         this.termInformation = termInformation;
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.replicatedLog = replicatedLog;
         this.peerAddresses = peerAddresses;
         this.configParams = configParams;
         this.LOG = logger;
@@ -161,10 +160,26 @@ public class RaftActorContextImpl implements RaftActorContext {
         peerAddresses.put(peerId, peerAddress);
     }
 
+    @Override
     public SnapshotManager getSnapshotManager() {
         if(snapshotManager == null){
             snapshotManager = new SnapshotManager(this, LOG);
         }
         return snapshotManager;
     }
+
+    @Override
+    public long getTotalMemory() {
+        return totalMemoryRetriever != null ? totalMemoryRetriever.get() : Runtime.getRuntime().totalMemory();
+    }
+
+    @Override
+    public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+        totalMemoryRetriever = retriever;
+    }
+
+    @Override
+    public boolean hasFollowers() {
+        return getPeerAddresses().keySet().size() > 0;
+    }
 }
index 82d0839..3e4d727 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import akka.japi.Procedure;
 import java.util.List;
 
 /**
@@ -85,6 +86,8 @@ public interface ReplicatedLog {
      */
     void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
 
+    void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
+
     /**
      *
      * @param index the index of the log entry
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
new file mode 100644 (file)
index 0000000..fdb6305
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+/**
+ * Implementation of ReplicatedLog used by the RaftActor.
+ */
+class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+    private static final int DATA_SIZE_DIVIDER = 5;
+
+    private long dataSizeSinceLastSnapshot = 0L;
+    private final RaftActorContext context;
+    private final DataPersistenceProvider persistence;
+    private final RaftActorBehavior currentBehavior;
+
+    private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
+        @Override
+        public void apply(DeleteEntries param) {
+            dataSize = 0;
+            for (ReplicatedLogEntry entry : journal) {
+                dataSize += entry.size();
+            }
+        }
+    };
+
+    static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
+            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+                snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+    }
+
+    static ReplicatedLog newInstance(RaftActorContext context,
+            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
+                persistence, currentBehavior);
+    }
+
+    private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
+            RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        super(snapshotIndex, snapshotTerm, unAppliedEntries);
+        this.context = context;
+        this.persistence = persistence;
+        this.currentBehavior = currentBehavior;
+    }
+
+    @Override
+    public void removeFromAndPersist(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+
+        if (adjustedIndex < 0) {
+            return;
+        }
+
+        // FIXME: Maybe this should be done after the command is saved
+        journal.subList(adjustedIndex , journal.size()).clear();
+
+        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+    }
+
+    @Override
+    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+        appendAndPersist(replicatedLogEntry, null);
+    }
+
+    @Override
+    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback)  {
+
+        if(context.getLogger().isDebugEnabled()) {
+            context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
+        }
+
+        // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+        journal.add(replicatedLogEntry);
+
+        // When persisting events with persist it is guaranteed that the
+        // persistent actor will not receive further commands between the
+        // persist call and the execution(s) of the associated event
+        // handler. This also holds for multiple persist calls in context
+        // of a single command.
+        persistence.persist(replicatedLogEntry,
+            new Procedure<ReplicatedLogEntry>() {
+                @Override
+                public void apply(ReplicatedLogEntry evt) throws Exception {
+                    int logEntrySize = replicatedLogEntry.size();
+
+                    dataSize += logEntrySize;
+                    long dataSizeForCheck = dataSize;
+
+                    dataSizeSinceLastSnapshot += logEntrySize;
+
+                    if (!context.hasFollowers()) {
+                        // When we do not have followers we do not maintain an in-memory log
+                        // due to this the journalSize will never become anything close to the
+                        // snapshot batch count. In fact will mostly be 1.
+                        // Similarly since the journal's dataSize depends on the entries in the
+                        // journal the journal's dataSize will never reach a value close to the
+                        // memory threshold.
+                        // By maintaining the dataSize outside the journal we are tracking essentially
+                        // what we have written to the disk however since we no longer are in
+                        // need of doing a snapshot just for the sake of freeing up memory we adjust
+                        // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+                        // as if we were maintaining a real snapshot
+                        dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+                    }
+                    long journalSize = replicatedLogEntry.getIndex() + 1;
+                    long dataThreshold = context.getTotalMemory() *
+                            context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+                    if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                            || dataSizeForCheck > dataThreshold)) {
+
+                        boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                currentBehavior.getReplicatedToAllIndex());
+
+                        if(started){
+                            dataSizeSinceLastSnapshot = 0;
+                        }
+                    }
+
+                    if (callback != null){
+                        callback.apply(replicatedLogEntry);
+                    }
+                }
+            }
+        );
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java
new file mode 100644 (file)
index 0000000..776dae7
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * A RaftActorBehavior implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingRaftActorBehavior implements RaftActorBehavior {
+    private RaftActorBehavior delegate;
+
+    public RaftActorBehavior getDelegate() {
+        return delegate;
+    }
+
+    public void setDelegate(RaftActorBehavior delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        return delegate.handleMessage(sender, message);
+    }
+
+    @Override
+    public RaftState state() {
+        return delegate.state();
+    }
+
+    @Override
+    public String getLeaderId() {
+        return delegate.getLeaderId();
+    }
+
+    @Override
+    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+        delegate.setReplicatedToAllIndex(replicatedToAllIndex);
+    }
+
+    @Override
+    public long getReplicatedToAllIndex() {
+        return delegate.getReplicatedToAllIndex();
+    }
+}
index dfaa8d5..b910313 100644 (file)
@@ -18,6 +18,7 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
@@ -52,7 +53,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
         private volatile byte[] snapshot;
-        private volatile long mockTotalMemory;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -74,13 +74,18 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             dropMessages.remove(msgClass);
         }
 
-        void setMockTotalMemory(long mockTotalMemory) {
-            this.mockTotalMemory = mockTotalMemory;
-        }
+        void setMockTotalMemory(final long mockTotalMemory) {
+            if(mockTotalMemory > 0) {
+                getRaftActorContext().setTotalMemoryRetriever(new Supplier<Long>() {
+                    @Override
+                    public Long get() {
+                        return mockTotalMemory;
+                    }
 
-        @Override
-        protected long getTotalMemory() {
-            return mockTotalMemory > 0 ? mockTotalMemory : super.getTotalMemory();
+                });
+            } else {
+                getRaftActorContext().setTotalMemoryRetriever(null);
+            }
         }
 
         @Override
index 885c3ab..8fdb7ea 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -220,5 +221,9 @@ public class AbstractReplicatedLogImplTest {
         public List<ReplicatedLogEntry> getEntriesTill(final int index) {
             return journal.subList(0, index);
         }
+
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+        }
     }
 }
index 53cca23..63f0df2 100644 (file)
@@ -12,7 +12,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.japi.Procedure;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
 import java.util.HashMap;
@@ -203,6 +205,20 @@ public class MockRaftActorContext implements RaftActorContext {
         this.configParams = configParams;
     }
 
+    @Override
+    public long getTotalMemory() {
+        return Runtime.getRuntime().totalMemory();
+    }
+
+    @Override
+    public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+    }
+
+    @Override
+    public boolean hasFollowers() {
+        return getPeerAddresses().keySet().size() > 0;
+    }
+
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
@@ -217,6 +233,19 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
+
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+            append(replicatedLogEntry);
+
+            if(callback != null) {
+                try {
+                    callback.apply(replicatedLogEntry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
     public static class MockPayload extends Payload implements Serializable {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.