Refactor InMemoryJournal and InMemorySnapshot to sal-akka-raft 58/16358/5
authorTom Pantelis <tpanteli@brocade.com>
Wed, 11 Mar 2015 21:40:00 +0000 (17:40 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 13 Mar 2015 05:11:23 +0000 (01:11 -0400)
Moved the InMemoryJournal and InMemorySnapshot test classes from
sal-distributed-datastore to sal-akka-raft as they provide more
complete implementations than the MockAkkaJournal and
MockSnapshotStore. The latter were removed. sal-distributed-datastore
already has a dependency on sal-akka-raft's test-jar.

Change-Id: Ief82b05005699aec38d069f7ce229c56002ab8b0
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java with 61% similarity]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java with 53% similarity]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index c0bdc53..d90494f 100644 (file)
@@ -69,9 +69,9 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
-import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -89,8 +89,8 @@ public class RaftActorTest extends AbstractActorTest {
     @After
     public void tearDown() throws Exception {
         factory.close();
-        MockAkkaJournal.clearJournal();
-        MockSnapshotStore.setMockSnapshot(null);
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
     public static class MockRaftActor extends RaftActor {
@@ -377,8 +377,7 @@ public class RaftActorTest extends AbstractActorTest {
             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
                     lastAppliedDuringSnapshotCapture, 1);
-            MockSnapshotStore.setMockSnapshot(snapshot);
-            MockSnapshotStore.setPersistenceId(persistenceId);
+            InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
 
             // add more entries after snapshot is taken
             List<ReplicatedLogEntry> entries = new ArrayList<>();
@@ -395,12 +394,11 @@ public class RaftActorTest extends AbstractActorTest {
             int lastAppliedToState = 5;
             int lastIndex = 7;
 
-            MockAkkaJournal.addToJournal(5, entry2);
+            InMemoryJournal.addEntry(persistenceId, 5, entry2);
             // 2 entries are applied to state besides the 4 entries in snapshot
-            MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
-            MockAkkaJournal.addToJournal(7, entry3);
-            MockAkkaJournal.addToJournal(8, entry4);
-
+            InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
+            InMemoryJournal.addEntry(persistenceId, 7, entry3);
+            InMemoryJournal.addEntry(persistenceId, 8, entry4);
 
             // kill the actor
             followerActor.tell(PoisonPill.getInstance(), null);
@@ -442,10 +440,10 @@ public class RaftActorTest extends AbstractActorTest {
                     new MockRaftActorContext.MockPayload("two"));
 
             long seqNr = 1;
-            MockAkkaJournal.addToJournal(seqNr++, entry0);
-            MockAkkaJournal.addToJournal(seqNr++, entry1);
-            MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
-            MockAkkaJournal.addToJournal(seqNr++, entry2);
+            InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
+            InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
+            InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
+            InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
 
             int lastAppliedToState = 1;
             int lastIndex = 2;
@@ -1,37 +1,49 @@
 /*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.utils;
+package org.opendaylight.controller.cluster.raft.utils;
 
-import static org.junit.Assert.assertEquals;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
-import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
-import akka.persistence.PersistentImpl;
-import akka.persistence.PersistentRepr;
-import akka.persistence.journal.japi.AsyncWriteJournal;
 
+/**
+ * An akka AsyncWriteJournal implementation that stores data in memory. This is intended for testing.
+ *
+ * @author Thomas Pantelis
+ */
 public class InMemoryJournal extends AsyncWriteJournal {
 
+    static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class);
+
     private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
 
     private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
 
+    private static final Map<String, CountDownLatch> writeMessagesCompleteLatches = new ConcurrentHashMap<>();
+
     private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
 
     public static void addEntry(String persistenceId, long sequenceNr, Object data) {
@@ -50,20 +62,64 @@ public class InMemoryJournal extends AsyncWriteJournal {
         journals.clear();
     }
 
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> get(String persistenceId, Class<T> type) {
+        Map<Long, Object> journalMap = journals.get(persistenceId);
+        if(journalMap == null) {
+            return Collections.<T>emptyList();
+        }
+
+        synchronized (journalMap) {
+            List<T> journal = new ArrayList<>(journalMap.size());
+            for(Object entry: journalMap.values()) {
+                if(type.isInstance(entry)) {
+                    journal.add((T) entry);
+                }
+            }
+
+            return journal;
+        }
+    }
+
     public static Map<Long, Object> get(String persistenceId) {
-        Map<Long, Object> journal = journals.get(persistenceId);
-        return journal != null ? journal : Collections.<Long, Object>emptyMap();
+        Map<Long, Object> journalMap = journals.get(persistenceId);
+        return journalMap != null ? journalMap : Collections.<Long, Object>emptyMap();
+    }
+
+    public static void dumpJournal(String persistenceId) {
+        StringBuilder builder = new StringBuilder(String.format("Journal log for %s:", persistenceId));
+        Map<Long, Object> journalMap = journals.get(persistenceId);
+        if(journalMap != null) {
+            synchronized (journalMap) {
+                for(Map.Entry<Long, Object> e: journalMap.entrySet()) {
+                    builder.append("\n    ").append(e.getKey()).append(" = ").append(e.getValue());
+                }
+            }
+        }
+
+        LOG.info(builder.toString());
     }
 
     public static void waitForDeleteMessagesComplete(String persistenceId) {
-        assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(
-                deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS));
+        if(!Uninterruptibles.awaitUninterruptibly(deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+            throw new AssertionError("Delete messages did not complete");
+        }
+    }
+
+    public static void waitForWriteMessagesComplete(String persistenceId) {
+        if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+            throw new AssertionError("Journal write messages did not complete");
+        }
     }
 
     public static void addDeleteMessagesCompleteLatch(String persistenceId) {
         deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
     }
 
+    public static void addWriteMessagesCompleteLatch(String persistenceId, int count) {
+        writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count));
+    }
+
     public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
         blockReadMessagesLatches.put(persistenceId, latch);
     }
@@ -100,7 +156,23 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
-        return Futures.successful(-1L);
+        // Akka calls this during recovery.
+
+        Map<Long, Object> journal = journals.get(persistenceId);
+        if(journal == null) {
+            return Futures.successful(-1L);
+        }
+
+        synchronized (journal) {
+            long highest = -1;
+            for (Long seqNr : journal.keySet()) {
+                if(seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) {
+                    highest = seqNr.longValue();
+                }
+            }
+
+            return Futures.successful(highest);
+        }
     }
 
     @Override
@@ -116,9 +188,17 @@ public class InMemoryJournal extends AsyncWriteJournal {
                     }
 
                     synchronized (journal) {
+                        LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+                                repr.sequenceNr(), repr.payload());
                         journal.put(repr.sequenceNr(), repr.payload());
                     }
+
+                    CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId());
+                    if(latch != null) {
+                        latch.countDown();
+                    }
                 }
+
                 return null;
             }
         }, context().dispatcher());
@@ -1,12 +1,12 @@
 /*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.datastore.utils;
+package org.opendaylight.controller.cluster.raft.utils;
 
 import akka.dispatch.Futures;
 import akka.japi.Option;
@@ -15,18 +15,29 @@ import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.snapshot.japi.SnapshotStore;
 import com.google.common.collect.Iterables;
-import scala.concurrent.Future;
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
+/**
+ * An akka SnapshotStore implementation that stores data in memory. This is intended for testing.
+ *
+ * @author Thomas Pantelis
+ */
 public class InMemorySnapshotStore extends SnapshotStore {
 
+    static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class);
+
     private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
 
-    public static void addSnapshot(String persistentId, Snapshot snapshot) {
+    public static void addSnapshot(String persistentId, Object snapshot) {
         List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
 
         if(snapshotList == null) {
@@ -34,8 +45,30 @@ public class InMemorySnapshotStore extends SnapshotStore {
             snapshots.put(persistentId, snapshotList);
         }
 
-        snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
-                System.currentTimeMillis()), snapshot));
+        synchronized (snapshotList) {
+            snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
+                    System.currentTimeMillis()), snapshot));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> getSnapshots(String persistentId, Class<T> type) {
+        List<StoredSnapshot> stored = snapshots.get(persistentId);
+        if(stored == null) {
+            return Collections.emptyList();
+        }
+
+        List<T> retList;
+        synchronized (stored) {
+            retList = Lists.newArrayListWithCapacity(stored.size());
+            for(StoredSnapshot s: stored) {
+                if(type.isInstance(s.getData())) {
+                    retList.add((T) s.getData());
+                }
+            }
+        }
+
+        return retList;
     }
 
     public static void clear() {
@@ -64,7 +97,9 @@ public class InMemorySnapshotStore extends SnapshotStore {
             snapshotList = new ArrayList<>();
             snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
         }
-        snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
+        synchronized (snapshotList) {
+            snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
+        }
 
         return Futures.successful(null);
     }
@@ -83,32 +118,42 @@ public class InMemorySnapshotStore extends SnapshotStore {
 
         int deleteIndex = -1;
 
-        for(int i=0;i<snapshotList.size(); i++){
-            StoredSnapshot snapshot = snapshotList.get(i);
-            if(snapshotMetadata.equals(snapshot.getMetadata())){
-                deleteIndex = i;
-                break;
+        synchronized (snapshotList) {
+            for(int i=0;i<snapshotList.size(); i++){
+                StoredSnapshot snapshot = snapshotList.get(i);
+                if(snapshotMetadata.equals(snapshot.getMetadata())){
+                    deleteIndex = i;
+                    break;
+                }
             }
-        }
 
-        if(deleteIndex != -1){
-            snapshotList.remove(deleteIndex);
+            if(deleteIndex != -1){
+                snapshotList.remove(deleteIndex);
+            }
         }
-
     }
 
     @Override
-    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+    public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
         throws Exception {
-        List<StoredSnapshot> snapshotList = snapshots.get(s);
+        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
 
         if(snapshotList == null){
             return;
         }
 
-        // TODO : This is a quick and dirty implementation. Do actual match later.
-        snapshotList.clear();
-        snapshots.remove(s);
+        synchronized (snapshotList) {
+            Iterator<StoredSnapshot> iter = snapshotList.iterator();
+            while(iter.hasNext()) {
+                StoredSnapshot s = iter.next();
+                LOG.trace("doDelete: sequenceNr: {}, maxSequenceNr: {}", s.getMetadata().sequenceNr(),
+                        snapshotSelectionCriteria.maxSequenceNr());
+
+                if(s.getMetadata().sequenceNr() <= snapshotSelectionCriteria.maxSequenceNr()) {
+                    iter.remove();
+                }
+            }
+        }
     }
 
     private static class StoredSnapshot {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java
deleted file mode 100644 (file)
index 47864be..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.raft.utils;
-
-import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
-import akka.persistence.PersistentImpl;
-import akka.persistence.PersistentRepr;
-import akka.persistence.journal.japi.AsyncWriteJournal;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import scala.concurrent.Future;
-
-public class MockAkkaJournal extends AsyncWriteJournal {
-
-    private static Map<Long, Object> journal = Maps.newLinkedHashMap();
-
-    public static void addToJournal(long sequenceNr, Object message) {
-        journal.put(sequenceNr, message);
-    }
-
-    public static void clearJournal() {
-        journal.clear();
-    }
-
-    @Override
-    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
-        long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
-
-        return Futures.future(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
-                    PersistentRepr persistentMessage =
-                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
-                    replayCallback.apply(persistentMessage);
-                }
-                return null;
-            }
-        }, context().dispatcher());
-    }
-
-    @Override
-    public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
-        return Futures.successful(new Long(0));
-    }
-
-    @Override
-    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> persistentReprs) {
-        return Futures.successful(null);
-    }
-
-    @Override
-    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> persistentConfirmations) {
-        return Futures.successful(null);
-    }
-
-    @Override
-    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds, boolean b) {
-        return Futures.successful(null);
-    }
-
-    @Override
-    public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
-        return Futures.successful(null);
-    }
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockSnapshotStore.java
deleted file mode 100644 (file)
index d70bf92..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.utils;
-
-import akka.dispatch.Futures;
-import akka.japi.Option;
-import akka.persistence.SelectedSnapshot;
-import akka.persistence.SnapshotMetadata;
-import akka.persistence.SnapshotSelectionCriteria;
-import akka.persistence.snapshot.japi.SnapshotStore;
-import org.opendaylight.controller.cluster.raft.Snapshot;
-import scala.concurrent.Future;
-
-
-public class MockSnapshotStore  extends SnapshotStore {
-
-    private static Snapshot mockSnapshot;
-    private static String persistenceId;
-
-    public static void setMockSnapshot(Snapshot s) {
-        mockSnapshot = s;
-    }
-
-    public static void setPersistenceId(String pId) {
-        persistenceId = pId;
-    }
-
-    @Override
-    public Future<Option<SelectedSnapshot>> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) {
-        if (mockSnapshot == null) {
-            return Futures.successful(Option.<SelectedSnapshot>none());
-        }
-
-        SnapshotMetadata smd = new SnapshotMetadata(persistenceId, 1, 12345);
-        SelectedSnapshot selectedSnapshot =
-            new SelectedSnapshot(smd, mockSnapshot);
-        return Futures.successful(Option.some(selectedSnapshot));
-    }
-
-    @Override
-    public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
-        return null;
-    }
-
-    @Override
-    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
-
-    }
-
-    @Override
-    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
-
-    }
-
-    @Override
-    public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) throws Exception {
-
-    }
-}
index 818ddf7..8a3b3e5 100644 (file)
@@ -26,14 +26,14 @@ akka {
 
 mock-snapshot-store {
   # Class name of the plugin.
-  class = "org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore"
+  class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
   # Dispatcher for the plugin actor.
   plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
 }
 
 mock-journal {
   # Class name of the plugin.
-  class = "org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal"
+  class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
   # Dispatcher for the plugin actor.
   plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
 }
index 2c41b00..3ac61f2 100644 (file)
@@ -37,8 +37,8 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
index 1ad2be7..54a9e2d 100644 (file)
@@ -20,9 +20,9 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
index c005751..9941707 100644 (file)
@@ -45,12 +45,12 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -641,7 +641,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
         private static final long serialVersionUID = 1L;
-        private Creator<ShardManager> delegate;
+        private final Creator<ShardManager> delegate;
 
         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
             this.delegate = delegate;
index f38b510..e10f566 100644 (file)
@@ -58,8 +58,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
@@ -76,6 +74,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
index ded5226..471009d 100644 (file)
@@ -45,8 +45,6 @@ import org.opendaylight.controller.cluster.datastore.modification.ModificationPa
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -55,6 +53,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
index 3a37dd9..badec6f 100644 (file)
@@ -19,12 +19,12 @@ akka {
 }
 
 in-memory-journal {
-    class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
 }
 
 in-memory-snapshot-store {
   # Class name of the plugin.
-  class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+  class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
   # Dispatcher for the plugin actor.
   plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.