*/
public class DefaultConfigParamsImpl implements ConfigParams {
- private static final int SNAPSHOT_BATCH_COUNT = 100000;
+ private static final int SNAPSHOT_BATCH_COUNT = 20000;
/**
* The maximum election time variance
@Override public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
- LOG.debug("SnapshotOffer called..");
+ LOG.info("SnapshotOffer called..");
SnapshotOffer offer = (SnapshotOffer) message;
Snapshot snapshot = (Snapshot) offer.snapshot();
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
- LOG.debug("Applied snapshot to replicatedLog. " +
- "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ LOG.info("Applied snapshot to replicatedLog. " +
+ "snapshotIndex={}, snapshotTerm={}, journal-size={}",
replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
- replicatedLog.size());
+ replicatedLog.size()
+ );
// Apply the snapshot to the actors state
applySnapshot(ByteString.copyFrom(snapshot.getState()));
context.removePeer(rrp.getName());
} else if (message instanceof CaptureSnapshot) {
- LOG.debug("CaptureSnapshot received by actor");
+ LOG.info("CaptureSnapshot received by actor");
CaptureSnapshot cs = (CaptureSnapshot)message;
captureSnapshot = cs;
createSnapshot();
} else if (message instanceof CaptureSnapshotReply){
- LOG.debug("CaptureSnapshotReply received by actor");
+ LOG.info("CaptureSnapshotReply received by actor");
CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
ByteString stateInBytes = csr.getSnapshot();
- LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+ LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
handleCaptureSnapshotReply(stateInBytes);
} else {
.tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
self());
+ createSnapshotTransaction = null;
// Send a PoisonPill instead of sending close transaction because we do not really need
// a response
getSender().tell(PoisonPill.getInstance(), self());
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
+
+ LOG.info("Applying snapshot");
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
syncCommitTransaction(transaction);
} catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred when applying snapshot");
+ } finally {
+ LOG.info("Done applying snapshot");
}
}
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
+
+ deletePersistenceFiles();
}
@AfterClass
public static void tearDownClass() throws IOException {
JavaTestKit.shutdownActorSystem(system);
system = null;
+
+ deletePersistenceFiles();
}
protected static void deletePersistenceFiles() throws IOException {
subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
getRef());
- waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+ subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+ getRef());
+
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
}
};
- Thread.sleep(2000);
deletePersistenceFiles();
}};
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.collect.Iterables;
+import scala.concurrent.Future;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InMemorySnapshotStore extends SnapshotStore {
+
+ Map<String, List<Snapshot>> snapshots = new HashMap<>();
+
+ @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+ SnapshotSelectionCriteria snapshotSelectionCriteria) {
+ List<Snapshot> snapshotList = snapshots.get(s);
+ if(snapshotList == null){
+ return Futures.successful(Option.<SelectedSnapshot>none());
+ }
+
+ Snapshot snapshot = Iterables.getLast(snapshotList);
+ SelectedSnapshot selectedSnapshot =
+ new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
+ return Futures.successful(Option.some(selectedSnapshot));
+ }
+
+ @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+ List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+ if(snapshotList == null){
+ snapshotList = new ArrayList<>();
+ snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
+ }
+ snapshotList.add(new Snapshot(snapshotMetadata, o));
+
+ return Futures.successful(null);
+ }
+
+ @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+ }
+
+ @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+ List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+ if(snapshotList == null){
+ return;
+ }
+
+ int deleteIndex = -1;
+
+ for(int i=0;i<snapshotList.size(); i++){
+ Snapshot snapshot = snapshotList.get(i);
+ if(snapshotMetadata.equals(snapshot.getMetadata())){
+ deleteIndex = i;
+ break;
+ }
+ }
+
+ if(deleteIndex != -1){
+ snapshotList.remove(deleteIndex);
+ }
+
+ }
+
+ @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+ throws Exception {
+ List<Snapshot> snapshotList = snapshots.get(s);
+
+ if(snapshotList == null){
+ return;
+ }
+
+ // TODO : This is a quick and dirty implementation. Do actual match later.
+ snapshotList.clear();
+ snapshots.remove(s);
+ }
+
+ private static class Snapshot {
+ private final SnapshotMetadata metadata;
+ private final Object data;
+
+ private Snapshot(SnapshotMetadata metadata, Object data) {
+ this.metadata = metadata;
+ this.data = data;
+ }
+
+ public SnapshotMetadata getMetadata() {
+ return metadata;
+ }
+
+ public Object getData() {
+ return data;
+ }
+ }
+}
akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+
loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
actor {
}
}
}
+
+in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000