import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
-import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
@After
public void tearDown() throws Exception {
factory.close();
- MockAkkaJournal.clearJournal();
- MockSnapshotStore.setMockSnapshot(null);
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
public static class MockRaftActor extends RaftActor {
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
lastAppliedDuringSnapshotCapture, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
+ InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
// add more entries after snapshot is taken
List<ReplicatedLogEntry> entries = new ArrayList<>();
int lastAppliedToState = 5;
int lastIndex = 7;
- MockAkkaJournal.addToJournal(5, entry2);
+ InMemoryJournal.addEntry(persistenceId, 5, entry2);
// 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
- MockAkkaJournal.addToJournal(7, entry3);
- MockAkkaJournal.addToJournal(8, entry4);
-
+ InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
+ InMemoryJournal.addEntry(persistenceId, 7, entry3);
+ InMemoryJournal.addEntry(persistenceId, 8, entry4);
// kill the actor
followerActor.tell(PoisonPill.getInstance(), null);
new MockRaftActorContext.MockPayload("two"));
long seqNr = 1;
- MockAkkaJournal.addToJournal(seqNr++, entry0);
- MockAkkaJournal.addToJournal(seqNr++, entry1);
- MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
- MockAkkaJournal.addToJournal(seqNr++, entry2);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
int lastAppliedToState = 1;
int lastIndex = 2;
/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore.utils;
+package org.opendaylight.controller.cluster.raft.utils;
-import static org.junit.Assert.assertEquals;
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
-import akka.persistence.PersistentImpl;
-import akka.persistence.PersistentRepr;
-import akka.persistence.journal.japi.AsyncWriteJournal;
+/**
+ * An akka AsyncWriteJournal implementation that stores data in memory. This is intended for testing.
+ *
+ * @author Thomas Pantelis
+ */
public class InMemoryJournal extends AsyncWriteJournal {
+ static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class);
+
private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> writeMessagesCompleteLatches = new ConcurrentHashMap<>();
+
private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
public static void addEntry(String persistenceId, long sequenceNr, Object data) {
journals.clear();
}
+ @SuppressWarnings("unchecked")
+ public static <T> List<T> get(String persistenceId, Class<T> type) {
+ Map<Long, Object> journalMap = journals.get(persistenceId);
+ if(journalMap == null) {
+ return Collections.<T>emptyList();
+ }
+
+ synchronized (journalMap) {
+ List<T> journal = new ArrayList<>(journalMap.size());
+ for(Object entry: journalMap.values()) {
+ if(type.isInstance(entry)) {
+ journal.add((T) entry);
+ }
+ }
+
+ return journal;
+ }
+ }
+
public static Map<Long, Object> get(String persistenceId) {
- Map<Long, Object> journal = journals.get(persistenceId);
- return journal != null ? journal : Collections.<Long, Object>emptyMap();
+ Map<Long, Object> journalMap = journals.get(persistenceId);
+ return journalMap != null ? journalMap : Collections.<Long, Object>emptyMap();
+ }
+
+ public static void dumpJournal(String persistenceId) {
+ StringBuilder builder = new StringBuilder(String.format("Journal log for %s:", persistenceId));
+ Map<Long, Object> journalMap = journals.get(persistenceId);
+ if(journalMap != null) {
+ synchronized (journalMap) {
+ for(Map.Entry<Long, Object> e: journalMap.entrySet()) {
+ builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue());
+ }
+ }
+ }
+
+ LOG.info(builder.toString());
}
public static void waitForDeleteMessagesComplete(String persistenceId) {
- assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(
- deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS));
+ if(!Uninterruptibles.awaitUninterruptibly(deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ throw new AssertionError("Delete messages did not complete");
+ }
+ }
+
+ public static void waitForWriteMessagesComplete(String persistenceId) {
+ if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ throw new AssertionError("Journal write messages did not complete");
+ }
}
public static void addDeleteMessagesCompleteLatch(String persistenceId) {
deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
}
+ public static void addWriteMessagesCompleteLatch(String persistenceId, int count) {
+ writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count));
+ }
+
public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
blockReadMessagesLatches.put(persistenceId, latch);
}
@Override
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
- return Futures.successful(-1L);
+ // Akka calls this during recovery.
+
+ Map<Long, Object> journal = journals.get(persistenceId);
+ if(journal == null) {
+ return Futures.successful(-1L);
+ }
+
+ synchronized (journal) {
+ long highest = -1;
+ for (Long seqNr : journal.keySet()) {
+ if(seqNr.longValue() >= fromSequenceNr && seqNr.longValue() > highest) {
+ highest = seqNr.longValue();
+ }
+ }
+
+ return Futures.successful(highest);
+ }
}
@Override
}
synchronized (journal) {
+ LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+ repr.sequenceNr(), repr.payload());
journal.put(repr.sequenceNr(), repr.payload());
}
+
+ CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId());
+ if(latch != null) {
+ latch.countDown();
+ }
}
+
return null;
}
}, context().dispatcher());
/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore.utils;
+package org.opendaylight.controller.cluster.raft.utils;
import akka.dispatch.Futures;
import akka.japi.Option;
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.snapshot.japi.SnapshotStore;
import com.google.common.collect.Iterables;
-import scala.concurrent.Future;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+/**
+ * An akka SnapshotStore implementation that stores data in memory. This is intended for testing.
+ *
+ * @author Thomas Pantelis
+ */
public class InMemorySnapshotStore extends SnapshotStore {
+ static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class);
+
private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
- public static void addSnapshot(String persistentId, Snapshot snapshot) {
+ public static void addSnapshot(String persistentId, Object snapshot) {
List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
if(snapshotList == null) {
snapshots.put(persistentId, snapshotList);
}
- snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
- System.currentTimeMillis()), snapshot));
+ synchronized (snapshotList) {
+ snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
+ System.currentTimeMillis()), snapshot));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> List<T> getSnapshots(String persistentId, Class<T> type) {
+ List<StoredSnapshot> stored = snapshots.get(persistentId);
+ if(stored == null) {
+ return Collections.emptyList();
+ }
+
+ List<T> retList;
+ synchronized (stored) {
+ retList = Lists.newArrayListWithCapacity(stored.size());
+ for(StoredSnapshot s: stored) {
+ if(type.isInstance(s.getData())) {
+ retList.add((T) s.getData());
+ }
+ }
+ }
+
+ return retList;
}
public static void clear() {
snapshotList = new ArrayList<>();
snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
}
- snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
+ synchronized (snapshotList) {
+ snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
+ }
return Futures.successful(null);
}
int deleteIndex = -1;
- for(int i=0;i<snapshotList.size(); i++){
- StoredSnapshot snapshot = snapshotList.get(i);
- if(snapshotMetadata.equals(snapshot.getMetadata())){
- deleteIndex = i;
- break;
+ synchronized (snapshotList) {
+ for(int i=0;i<snapshotList.size(); i++){
+ StoredSnapshot snapshot = snapshotList.get(i);
+ if(snapshotMetadata.equals(snapshot.getMetadata())){
+ deleteIndex = i;
+ break;
+ }
}
- }
- if(deleteIndex != -1){
- snapshotList.remove(deleteIndex);
+ if(deleteIndex != -1){
+ snapshotList.remove(deleteIndex);
+ }
}
-
}
@Override
- public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+ public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
throws Exception {
- List<StoredSnapshot> snapshotList = snapshots.get(s);
+ List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
if(snapshotList == null){
return;
}
- // TODO : This is a quick and dirty implementation. Do actual match later.
- snapshotList.clear();
- snapshots.remove(s);
+ synchronized (snapshotList) {
+ Iterator<StoredSnapshot> iter = snapshotList.iterator();
+ while(iter.hasNext()) {
+ StoredSnapshot s = iter.next();
+ LOG.trace("doDelete: sequenceNr: {}, maxSequenceNr: {}", s.getMetadata().sequenceNr(),
+ snapshotSelectionCriteria.maxSequenceNr());
+
+ if(s.getMetadata().sequenceNr() <= snapshotSelectionCriteria.maxSequenceNr()) {
+ iter.remove();
+ }
+ }
+ }
}
private static class StoredSnapshot {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.raft.utils;
-
-import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
-import akka.persistence.PersistentImpl;
-import akka.persistence.PersistentRepr;
-import akka.persistence.journal.japi.AsyncWriteJournal;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import scala.concurrent.Future;
-
-public class MockAkkaJournal extends AsyncWriteJournal {
-
- private static Map<Long, Object> journal = Maps.newLinkedHashMap();
-
- public static void addToJournal(long sequenceNr, Object message) {
- journal.put(sequenceNr, message);
- }
-
- public static void clearJournal() {
- journal.clear();
- }
-
- @Override
- public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
- long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
-
- return Futures.future(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- for (Map.Entry<Long,Object> entry : journal.entrySet()) {
- PersistentRepr persistentMessage =
- new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
- replayCallback.apply(persistentMessage);
- }
- return null;
- }
- }, context().dispatcher());
- }
-
- @Override
- public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
- return Futures.successful(new Long(0));
- }
-
- @Override
- public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> persistentReprs) {
- return Futures.successful(null);
- }
-
- @Override
- public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> persistentConfirmations) {
- return Futures.successful(null);
- }
-
- @Override
- public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds, boolean b) {
- return Futures.successful(null);
- }
-
- @Override
- public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
- return Futures.successful(null);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.utils;
-
-import akka.dispatch.Futures;
-import akka.japi.Option;
-import akka.persistence.SelectedSnapshot;
-import akka.persistence.SnapshotMetadata;
-import akka.persistence.SnapshotSelectionCriteria;
-import akka.persistence.snapshot.japi.SnapshotStore;
-import org.opendaylight.controller.cluster.raft.Snapshot;
-import scala.concurrent.Future;
-
-
-public class MockSnapshotStore extends SnapshotStore {
-
- private static Snapshot mockSnapshot;
- private static String persistenceId;
-
- public static void setMockSnapshot(Snapshot s) {
- mockSnapshot = s;
- }
-
- public static void setPersistenceId(String pId) {
- persistenceId = pId;
- }
-
- @Override
- public Future<Option<SelectedSnapshot>> doLoadAsync(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) {
- if (mockSnapshot == null) {
- return Futures.successful(Option.<SelectedSnapshot>none());
- }
-
- SnapshotMetadata smd = new SnapshotMetadata(persistenceId, 1, 12345);
- SelectedSnapshot selectedSnapshot =
- new SelectedSnapshot(smd, mockSnapshot);
- return Futures.successful(Option.some(selectedSnapshot));
- }
-
- @Override
- public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
- return null;
- }
-
- @Override
- public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
-
- }
-
- @Override
- public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
-
- }
-
- @Override
- public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria) throws Exception {
-
- }
-}
mock-snapshot-store {
# Class name of the plugin.
- class = "org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore"
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}
mock-journal {
# Class name of the plugin.
- class = "org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal"
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
private static final long serialVersionUID = 1L;
- private Creator<ShardManager> delegate;
+ private final Creator<ShardManager> delegate;
public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
this.delegate = delegate;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
}
in-memory-journal {
- class = "org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal"
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
}
in-memory-snapshot-store {
# Class name of the plugin.
- class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}