Bug 7362: Notify applyState synchronously 04/49404/7
authorTom Pantelis <tpanteli@brocade.com>
Thu, 15 Dec 2016 06:05:53 +0000 (01:05 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 6 Jan 2017 12:45:28 +0000 (12:45 +0000)
As per comments in bug 7362, it's problematic for snapshots that we
set lastApplied index but queue the ApplyState message. This can leave
the derived actor's captured state out-of-sync with the lastApplied index
if a snapshot occurs in between. We need to set lastApplied atomically
with apply to state so I added a callback that RaftActor sets in the
RaftActorContext that is called by the behavior's applyLogToStateMachine
method. The callback immediately calls applyState. The ApplyState message
is still queued for other compenents that trigger on it (and also unit
tests).

I considered setting lastApplied when the ApplyState message is processed,
and not in applyLogToStateMachine, but this would be problematic if
another AppendEntriesReply was queued before the ApplyState message
as it would try to apply it again.

Change-Id: Ie062af8440bc251eec9c9ef58f450dee23abd04c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
20 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index fff6ce9..2d2fce2 100644 (file)
@@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -132,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
-            delegatingPersistenceProvider, LOG);
+            delegatingPersistenceProvider, this::handleApplyState, LOG);
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
@@ -224,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
             return;
         }
-
         if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
-            long startTime = System.nanoTime();
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: Applying state for log index {} data {}",
-                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
-                applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            long elapsedTime = System.nanoTime() - startTime;
-            if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
-                LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
-            }
-
             if (!hasFollowers()) {
                 // for single node, the capture should happen after the apply state
                 // as we delete messages from the persistent journal which have made it to the snapshot
@@ -257,9 +238,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
-            // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
             possiblyHandleBehaviorMessage(message);
-
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
@@ -500,6 +479,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    private void handleApplyState(ApplyState applyState) {
+        long startTime = System.nanoTime();
+
+        Payload payload = applyState.getReplicatedLogEntry().getData();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Applying state for log index {} data {}",
+                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+        }
+
+        if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+            applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+        }
+
+        long elapsedTime = System.nanoTime() - startTime;
+        if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+            LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+        }
+
+        // Send the ApplyState message back to self to handle further processing asynchronously.
+        self().tell(applyState, self());
+    }
+
     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
@@ -545,7 +547,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
+                handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
index 2896c44..74a214f 100644 (file)
@@ -16,10 +16,12 @@ import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
@@ -316,4 +318,12 @@ public interface RaftActorContext {
      * @return current behavior.
      */
     RaftActorBehavior getCurrentBehavior();
+
+    /**
+     * Returns the consumer of ApplyState operations. This is invoked by a behavior when a log entry needs to be
+     * applied to the state.
+     *
+     * @return the Consumer
+     */
+    Consumer<ApplyState> getApplyStateConsumer();
 }
index dfbffb7..43a58b9 100644 (file)
@@ -24,8 +24,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
@@ -82,20 +85,25 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private Optional<Cluster> cluster;
 
+    private final Consumer<ApplyState> applyStateConsumer;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
-            ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
-            ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
+            @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
+            @Nonnull Map<String, String> peerAddresses,
+            @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
+            @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
-        this.termInformation = termInformation;
+        this.termInformation = Preconditions.checkNotNull(termInformation);
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.configParams = configParams;
-        this.persistenceProvider = persistenceProvider;
-        this.log = logger;
+        this.configParams = Preconditions.checkNotNull(configParams);
+        this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
+        this.log = Preconditions.checkNotNull(logger);
+        this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
 
-        for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
+        for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
         }
     }
@@ -386,6 +394,11 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.currentBehavior = Preconditions.checkNotNull(behavior);
     }
 
+    @Override
+    public Consumer<ApplyState> getApplyStateConsumer() {
+        return applyStateConsumer;
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     void close() {
         if (currentBehavior != null) {
index 6e34ff0..71576f6 100644 (file)
@@ -205,9 +205,17 @@ public interface ReplicatedLog {
     int dataSize();
 
     /**
-     * Determines if a snapshot need to be captured based on the count/memory consumed.
+     * Determines if a snapshot needs to be captured based on the count/memory consumed and initiates the capture.
      *
      * @param replicatedLogEntry the last log entry.
      */
     void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
+
+    /**
+     * Determines if a snapshot should be captured based on the count/memory consumed.
+     *
+     * @param logIndex the log index to use to determine if the log count has exceeded the threshold
+     * @return true if a snapshot should be captured, false otherwise
+     */
+    boolean shouldCaptureSnapshot(long logIndex);
 }
index 04606fb..fe87334 100644 (file)
@@ -53,12 +53,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+    public boolean shouldCaptureSnapshot(long logIndex) {
         final ConfigParams config = context.getConfigParams();
-        final long journalSize = replicatedLogEntry.getIndex() + 1;
+        final long journalSize = logIndex + 1;
         final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
 
-        if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) {
+        return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+    }
+
+    @Override
+    public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+        if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
                     context.getCurrentBehavior().getReplicatedToAllIndex());
             if (started && !context.hasFollowers()) {
index 4fae48e..1b3abff 100644 (file)
@@ -323,14 +323,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
 
             if (replicatedCount == 0) {
-                // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
-                // we should be able to update the commit index if we get a consensus amongst the followers
-                // w/o the local persistence ack however this can cause timing issues with snapshot capture
-                // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
-                // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
-                // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
-                // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
-                // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+                // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+                // amongst the followers w/o the local persistence ack.
                 break;
             }
 
index 5c5c520..80ed698 100644 (file)
@@ -367,7 +367,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index the log index
      */
     protected void applyLogToStateMachine(final long index) {
-        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
 
@@ -376,16 +375,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
 
-                final ApplyState msg;
+                final ApplyState applyState;
                 final ClientRequestTracker tracker = removeClientRequestTracker(i);
                 if (tracker != null) {
-                    msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+                    applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
                 } else {
-                    msg = new ApplyState(null, null, replicatedLogEntry);
+                    applyState = new ApplyState(null, null, replicatedLogEntry);
                 }
 
-                actor().tell(msg, actor());
-                newLastApplied = i;
+                log.debug("{}: Setting last applied to {}", logName(), i);
+
+                context.setLastApplied(i);
+                context.getApplyStateConsumer().accept(applyState);
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
@@ -395,10 +396,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             }
         }
 
-        log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
-
-        context.setLastApplied(newLastApplied);
-
         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
         // will be used during recovery
         //in case if the above code throws an error and this message is not sent, it would be fine
index 7e8a772..6f107e9 100644 (file)
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -55,9 +56,6 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
-    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
-        logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
-
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
@@ -236,6 +234,22 @@ public class Follower extends AbstractRaftActorBehavior {
             log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
                     lastIndex, addEntriesFrom);
 
+            // When persistence successfully completes for each new log entry appended, we need to determine if we
+            // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
+            // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
+            // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
+            // This is done because subsequent log entries after the one that tripped the threshold may have been
+            // applied to the state already, as the persistence callback occurs async, and we want those entries
+            // purged from the persisted log as well.
+            final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
+            final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+                final ReplicatedLogEntry lastEntryToAppend = appendEntries.getEntries().get(
+                        appendEntries.getEntries().size() - 1);
+                if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
+                    context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
+                }
+            };
+
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
                 ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
@@ -244,6 +258,9 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
 
+                shouldCaptureSnapshot.compareAndSet(false,
+                        context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
+
                 if (entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
                 }
index a37a131..8ff6831 100644 (file)
@@ -333,5 +333,10 @@ public class AbstractReplicatedLogImplTest {
         @Override
         public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
         }
+
+        @Override
+        public boolean shouldCaptureSnapshot(long logIndex) {
+            return false;
+        }
     }
 }
index b4b558b..dbced5f 100644 (file)
@@ -85,6 +85,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
     }
 
+    @Override
+    public RaftActorContext getRaftActorContext() {
+        return super.getRaftActorContext();
+    }
+
     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
         return snapshotMessageSupport;
     }
index 65f0a62..cdcaadf 100644 (file)
@@ -62,13 +62,14 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+            new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
+            applyState -> actor.tell(applyState, actor), LOG);
 
         this.system = system;
 
@@ -142,6 +143,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
         }
 
+        @Override
+        public boolean shouldCaptureSnapshot(long logIndex) {
+            return false;
+        }
+
         @Override
         public boolean removeFromAndPersist(long index) {
             return removeFrom(index) >= 0;
index 874a17c..56523cb 100644 (file)
@@ -59,7 +59,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
-                peerMap, configParams, new NonPersistentDataProvider(), log);
+                peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, log);
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -84,7 +84,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                new NonPersistentDataProvider(), log);
+                new NonPersistentDataProvider(), applyState -> { }, log);
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -98,7 +98,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), log);
+                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, log);
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
index 9c42101..e0abd17 100644 (file)
@@ -77,7 +77,8 @@ public class RaftActorRecoverySupportTest {
         MockitoAnnotations.initMocks(this);
 
         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
-                LOG), -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+                LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
+                mockPersistence, applyState -> { }, LOG);
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
index af408e6..a19281f 100644 (file)
@@ -204,9 +204,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
                 newFollowerActorContext.getPeerIds());
 
-        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
-        expectFirstMatching(followerActor, ApplyState.class);
-
         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
@@ -1479,7 +1476,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
+                id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
+                noPersistence, applyState -> actor.tell(applyState, actor), LOG);
     }
 
     abstract static class AbstractMockRaftActor extends MockRaftActor {
index b9b63dc..45b89b7 100644 (file)
@@ -64,8 +64,8 @@ public class RaftActorSnapshotMessageSupportTest {
         MockitoAnnotations.initMocks(this);
 
         context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
-                new ElectionTermImpl(mockPersistence, "test", LOG),
-                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG) {
+                new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
+                configParams, mockPersistence, applyState -> { }, LOG) {
             @Override
             public SnapshotManager getSnapshotManager() {
                 return mockSnapshotManager;
index 9e35461..edc5109 100644 (file)
@@ -416,7 +416,7 @@ public class RaftActorTest extends AbstractActorTest {
         ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F"));
 
         final Identifier id = new MockIdentifier("apply-state");
-        mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
+        mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry));
 
         verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
     }
index c2321ca..ff4d097 100644 (file)
@@ -57,8 +57,8 @@ public class ReplicatedLogImplTest {
         MockitoAnnotations.initMocks(this);
 
         context = new RaftActorContextImpl(null, null, "test",
-                new ElectionTermImpl(mockPersistence, "test", LOG),
-                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+                new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
+                configParams, mockPersistence, applyState -> { }, LOG);
     }
 
     private void verifyPersist(Object message) throws Exception {
index 88a2613..4124a94 100644 (file)
@@ -177,7 +177,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(), LOG);
+                new NonPersistentDataProvider(), applyState -> { }, LOG);
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
index 9d64b14..8cb914c 100644 (file)
@@ -21,8 +21,14 @@ import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.util.ArrayList;
@@ -31,15 +37,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActor;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorTest;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -50,9 +62,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -1087,6 +1104,208 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
     }
 
+    @Test
+    public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 1 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+                MockRaftActor.toObject(snapshot.getState()));
+    }
+
+    @Test
+    public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 2 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
+
+        // Reinstate the actor from persistence
+
+        actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+
+        followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
+        assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
+        assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
+        assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), followerRaftActor.get().getState());
+    }
+
+    @Test
+    public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+        String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(1);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 0 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
+        assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+                MockRaftActor.toObject(snapshot.getState()));
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+        RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+            @Override
+            public void createSnapshot(ActorRef actorRef) {
+                try {
+                    actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
+                            followerRaftActor.get().getState()).toByteArray()), actorRef);
+                } catch (Exception e) {
+                    Throwables.propagate(e);
+                }
+            }
+
+            @Override
+            public void applySnapshot(byte[] snapshotBytes) {
+            }
+        };
+        return snapshotCohort;
+    }
+
     public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
         int snapshotLength = bs.size();
         int start = offset;
index ddd2cb0..ab07b66 100644 (file)
@@ -86,7 +86,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -472,13 +471,6 @@ public class ShardTest extends AbstractShardTest {
 
         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
-        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
-        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
-
-        shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
         final DataTreeModification writeMod = store.takeSnapshot().newModification();
         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -486,10 +478,7 @@ public class ShardTest extends AbstractShardTest {
         writeMod.ready();
 
         final TransactionIdentifier tx = nextTransactionId();
-        final ApplyState applyState = new ApplyState(null, tx,
-                new SimpleReplicatedLogEntry(1, 2, payloadForModification(store, writeMod, tx)));
-
-        shard.tell(applyState, shard);
+        shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
 
         final Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.