hosttracker.keyscheme=IP
# LISP Flow Mapping configuration
-# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings
+# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings (default: true)
lisp.mappingOverwrite = true
-# Enable the Solicit-Map-Request (SMR) mechanism
-lisp.smr = false
+# Enable the Solicit-Map-Request (SMR) mechanism (default: true)
+lisp.smr = true
+# Choose policy for Explicit Locator Path (ELP) handling
+# There are three options:
+# default: don't add or remove locator records, return mapping as-is
+# both: keep the ELP, but add the next hop as a standalone non-LCAF locator with a lower priority
+# replace: remove the ELP, add the next hop as a standalone non-LCAF locator
+lisp.elpPolicy = default
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
/**
* A sample actor showing how the RaftActor is to be extended
*/
-public class ExampleActor extends RaftActor {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
- private final Map<String, String> state = new HashMap();
+ private final Map<String, String> state = new HashMap<>();
private long persistIdentifier = 1;
private final Optional<ActorRef> roleChangeNotifier;
}
}
- @Override protected void createSnapshot() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
ByteString bs = null;
try {
bs = fromObject(state);
getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
}
- @Override protected void applySnapshot(byte [] snapshot) {
+ @Override
+ public void applySnapshot(byte [] snapshot) {
state.clear();
try {
- state.putAll((HashMap) toObject(snapshot));
+ state.putAll((HashMap<String, String>) toObject(snapshot));
} catch (Exception e) {
LOG.error("Exception in applying snapshot", e);
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
+ LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
}
}
}
@Override
- protected void startLogRecoveryBatch(int maxBatchSize) {
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return this;
+ }
+
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ public void appendRecoveredLogEntry(Payload data) {
}
@Override
- protected void appendRecoveredLogEntry(Payload data) {
+ public void applyCurrentLogRecoveryBatch() {
}
@Override
- protected void applyCurrentLogRecoveryBatch() {
+ public void onRecoveryComplete() {
}
@Override
- protected void onRecoveryComplete() {
+ public void applyRecoverySnapshot(byte[] snapshot) {
}
@Override
- protected void applyRecoverySnapshot(byte[] snapshot) {
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return this;
}
}
}
// override this method to return the protobuff related extension fields and their values
- @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
- Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<>();
+ @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
+ Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
map.put(KeyValueMessages.key, getKey());
map.put(KeyValueMessages.value, getValue());
return map;
*/
package org.opendaylight.controller.cluster.raft;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
// We define this as ArrayList so we can use ensureCapacity.
- protected ArrayList<ReplicatedLogEntry> journal;
+ private ArrayList<ReplicatedLogEntry> journal;
private long snapshotIndex = -1;
private long snapshotTerm = -1;
private ArrayList<ReplicatedLogEntry> snapshottedJournal;
private long previousSnapshotIndex = -1;
private long previousSnapshotTerm = -1;
- protected int dataSize = 0;
+ private int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.journal = new ArrayList<>(unAppliedEntries);
+
+ for(ReplicatedLogEntry entry: journal) {
+ dataSize += entry.size();
+ }
}
public AbstractReplicatedLogImpl() {
}
@Override
- public void removeFrom(long logEntryIndex) {
+ public long removeFrom(long logEntryIndex) {
int adjustedIndex = adjustedIndex(logEntryIndex);
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
// physical index should be less than list size and >= 0
- return;
+ return -1;
+ }
+
+ for(int i = adjustedIndex; i < journal.size(); i++) {
+ dataSize -= journal.get(i).size();
}
+
journal.subList(adjustedIndex , journal.size()).clear();
+
+ return adjustedIndex;
}
@Override
public void append(ReplicatedLogEntry replicatedLogEntry) {
journal.add(replicatedLogEntry);
+ dataSize += replicatedLogEntry.size();
}
@Override
snapshotTerm = previousSnapshotTerm;
previousSnapshotTerm = -1;
}
+
+ @VisibleForTesting
+ ReplicatedLogEntry getAtPhysicalIndex(int index) {
+ return journal.get(index);
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
- private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
- private Stopwatch recoveryTimer;
+ private RaftActorRecoverySupport raftRecovery;
- private int currentRecoveryBatchCount;
+ private RaftActorSnapshotMessageSupport snapshotSupport;
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
}
- private void initRecoveryTimer() {
- if(recoveryTimer == null) {
- recoveryTimer = Stopwatch.createStarted();
- }
- }
-
@Override
public void preStart() throws Exception {
LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@Override
public void postStop() {
- if(currentBehavior != null) {
+ if(currentBehavior.getDelegate() != null) {
try {
currentBehavior.close();
} catch (Exception e) {
@Override
public void handleRecover(Object message) {
- if(persistence().isRecoveryApplicable()) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyLogEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
- }
- } else {
- if (message instanceof RecoveryCompleted) {
+ if(raftRecovery == null) {
+ raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+ getRaftActorRecoveryCohort());
+ }
+
+ boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+ if(recoveryComplete) {
+ if(!persistence().isRecoveryApplicable()) {
// Delete all the messages from the akka journal so that we do not end up with consistency issues
// Note I am not using the dataPersistenceProvider and directly using the akka api here
deleteMessages(lastSequenceNr());
// Delete all the akka snapshots as they will not be needed
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
- onRecoveryComplete();
-
- initializeBehavior();
}
- }
- }
-
- private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: SnapshotOffer called..", persistenceId());
- }
-
- initRecoveryTimer();
-
- Snapshot snapshot = (Snapshot) offer.snapshot();
- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
+ onRecoveryComplete();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
+ initializeBehavior();
- Stopwatch timer = Stopwatch.createStarted();
-
- // Apply the snapshot to the actors state
- applyRecoverySnapshot(snapshot.getState());
-
- timer.stop();
- LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog().size(), persistenceId(), timer.toString(),
- replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
- }
-
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+ raftRecovery = null;
}
-
- replicatedLog().append(logEntry);
- }
-
- private void onRecoveredApplyLogEntries(long toIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, toIndex);
- }
-
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
- }
-
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
- }
-
- private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
-
- int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- startLogRecoveryBatch(batchSize);
- }
-
- appendRecoveredLogEntry(logEntry.getData());
-
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();
- }
- }
-
- private void endCurrentLogRecoveryBatch() {
- applyCurrentLogRecoveryBatch();
- currentRecoveryBatchCount = 0;
- }
-
- private void onRecoveryCompletedMessage() {
- if(currentRecoveryBatchCount > 0) {
- endCurrentLogRecoveryBatch();
- }
-
- onRecoveryComplete();
-
- String recoveryTime = "";
- if(recoveryTimer != null) {
- recoveryTimer.stop();
- recoveryTime = " in " + recoveryTimer.toString();
- recoveryTimer = null;
- }
-
- LOG.info(
- "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
- initializeBehavior();
}
protected void initializeBehavior(){
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
- @Override public void handleCommand(Object message) {
+ @Override
+ public void handleCommand(Object message) {
+ if(snapshotSupport == null) {
+ snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+ currentBehavior, getRaftActorSnapshotCohort(), self());
+ }
+
+ boolean handled = snapshotSupport.handleSnapshotMessage(message);
+ if(handled) {
+ return;
+ }
+
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
persistence().persist(applyEntries, NoopProcedure.instance());
- } else if(message instanceof ApplySnapshot ) {
- Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm()
- );
- }
-
- applySnapshot(snapshot.getState());
-
- //clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
-
} else if (message instanceof FindLeader) {
getSender().tell(
new FindLeaderReply(getLeaderAddress()),
getSelf()
);
-
- } else if (message instanceof SaveSnapshotSuccess) {
- SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
- long sequenceNumber = success.metadata().sequenceNr();
-
- commitSnapshot(sequenceNumber);
-
- } else if (message instanceof SaveSnapshotFailure) {
- SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
- LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
- persistenceId(), saveSnapshotFailure.cause());
-
- context.getSnapshotManager().rollback();
-
- } else if (message instanceof CaptureSnapshot) {
- LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
- context.getSnapshotManager().create(createSnapshotProcedure);
-
- } else if (message instanceof CaptureSnapshotReply) {
- handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
- } else if (message.equals(COMMIT_SNAPSHOT)) {
- commitSnapshot(-1);
} else {
reusableBehaviorStateHolder.init(getCurrentBehavior());
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
- self().tell(COMMIT_SNAPSHOT, self());
+ self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
}
});
}
context.setPeerAddress(peerId, peerAddress);
}
- protected void commitSnapshot(long sequenceNumber) {
- context.getSnapshotManager().commit(persistence(), sequenceNumber);
- }
-
/**
* The applyState method will be called by the RaftActor when some data
* needs to be applied to the actor's state
Object data);
/**
- * This method is called during recovery at the start of a batch of state entries. Derived
- * classes should perform any initialization needed to start a batch.
+ * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
- /**
- * This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startLogRecoveryBatch}.
- *
- * @param data the state data
- */
- protected abstract void appendRecoveredLogEntry(Payload data);
-
- /**
- * This method is called during recovery to reconstruct the state of the actor.
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
- /**
- * This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveredLogEntry}.
- */
- protected abstract void applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
protected abstract void onRecoveryComplete();
/**
- * This method will be called by the RaftActor when a snapshot needs to be
- * created. The derived actor should respond with its current state.
- * <p/>
- * During recovery the state that is returned by the derived actor will
- * be passed back to it by calling the applySnapshot method
- *
- * @return The current state of the actor
+ * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
*/
- protected abstract void createSnapshot();
-
- /**
- * This method can be called at any other point during normal
- * operations when the derived actor is out of sync with it's peers
- * and the only way to bring it in sync is by applying a snapshot
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applySnapshot(byte[] snapshotBytes);
+ @Nonnull
+ protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
- private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
- context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
- }
-
protected boolean hasFollowers(){
return getRaftActorContext().hasFollowers();
}
}
}
- private class CreateSnapshotProcedure implements Procedure<Void> {
-
- @Override
- public void apply(Void aVoid) throws Exception {
- createSnapshot();
- }
- }
-
private static class BehaviorStateHolder {
private RaftActorBehavior behavior;
private String leaderId;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Interface for a class that participates in raft actor persistence recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorRecoveryCohort {
+
+ /**
+ * This method is called during recovery at the start of a batch of state entries. Derived
+ * classes should perform any initialization needed to start a batch.
+ */
+ void startLogRecoveryBatch(int maxBatchSize);
+
+ /**
+ * This method is called during recovery to append state data to the current batch. This method
+ * is called 1 or more times after {@link #startLogRecoveryBatch}.
+ *
+ * @param data the state data
+ */
+ void appendRecoveredLogEntry(Payload data);
+
+ /**
+ * This method is called during recovery to reconstruct the state of the actor.
+ *
+ * @param snapshotBytes A snapshot of the state of the actor
+ */
+ void applyRecoverySnapshot(byte[] snapshotBytes);
+
+ /**
+ * This method is called during recovery at the end of a batch to apply the current batched
+ * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+ */
+ void applyCurrentLogRecoveryBatch();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Support class that handles persistence recovery for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorRecoverySupport {
+ private final DataPersistenceProvider persistence;
+ private final RaftActorContext context;
+ private final RaftActorBehavior currentBehavior;
+ private final RaftActorRecoveryCohort cohort;
+
+ private int currentRecoveryBatchCount;
+
+ private Stopwatch recoveryTimer;
+ private final Logger log;
+
+ RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
+ RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
+ this.persistence = persistence;
+ this.context = context;
+ this.currentBehavior = currentBehavior;
+ this.cohort = cohort;
+ this.log = context.getLogger();
+ }
+
+ boolean handleRecoveryMessage(Object message) {
+ boolean recoveryComplete = false;
+ if(persistence.isRecoveryApplicable()) {
+ if (message instanceof SnapshotOffer) {
+ onRecoveredSnapshot((SnapshotOffer) message);
+ } else if (message instanceof ReplicatedLogEntry) {
+ onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+ } else if (message instanceof ApplyLogEntries) {
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+ } else if (message instanceof ApplyJournalEntries) {
+ onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if (message instanceof RecoveryCompleted) {
+ onRecoveryCompletedMessage();
+ recoveryComplete = true;
+ }
+ } else if (message instanceof RecoveryCompleted) {
+ recoveryComplete = true;
+ }
+
+ return recoveryComplete;
+ }
+
+ private ReplicatedLog replicatedLog() {
+ return context.getReplicatedLog();
+ }
+
+ private void initRecoveryTimer() {
+ if(recoveryTimer == null) {
+ recoveryTimer = Stopwatch.createStarted();
+ }
+ }
+
+ private void onRecoveredSnapshot(SnapshotOffer offer) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: SnapshotOffer called..", context.getId());
+ }
+
+ initRecoveryTimer();
+
+ Snapshot snapshot = (Snapshot) offer.snapshot();
+
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+ Stopwatch timer = Stopwatch.createStarted();
+
+ // Apply the snapshot to the actors state
+ cohort.applyRecoverySnapshot(snapshot.getState());
+
+ timer.stop();
+ log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ }
+
+ private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
+ logEntry.getIndex(), logEntry.size());
+ }
+
+ replicatedLog().append(logEntry);
+ }
+
+ private void onRecoveredApplyLogEntries(long toIndex) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ context.getId(), context.getLastApplied() + 1, toIndex);
+ }
+
+ for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
+ batchRecoveredLogEntry(replicatedLog().get(i));
+ }
+
+ context.setLastApplied(toIndex);
+ context.setCommitIndex(toIndex);
+ }
+
+ private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ initRecoveryTimer();
+
+ int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+ if(currentRecoveryBatchCount == 0) {
+ cohort.startLogRecoveryBatch(batchSize);
+ }
+
+ cohort.appendRecoveredLogEntry(logEntry.getData());
+
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
+ }
+
+ private void endCurrentLogRecoveryBatch() {
+ cohort.applyCurrentLogRecoveryBatch();
+ currentRecoveryBatchCount = 0;
+ }
+
+ private void onRecoveryCompletedMessage() {
+ if(currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+
+ String recoveryTime = "";
+ if(recoveryTimer != null) {
+ recoveryTimer.stop();
+ recoveryTime = " in " + recoveryTimer.toString();
+ recoveryTimer = null;
+ }
+
+ log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+ "Persistence Id = " + context.getId() +
+ " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
+ "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for a class that participates in raft actor snapshotting.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorSnapshotCohort {
+
+ /**
+ * This method is called by the RaftActor when a snapshot needs to be
+ * created. The implementation should send a CaptureSnapshotReply to the given actor.
+ *
+ * @param actorRef the actor to which to respond
+ */
+ void createSnapshot(ActorRef actorRef);
+
+ /**
+ * This method is called to apply a snapshot installed by the leader.
+ *
+ * @param snapshotBytes a snapshot of the state of the actor
+ */
+ void applySnapshot(byte[] snapshotBytes);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Handles snapshot related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorSnapshotMessageSupport {
+ static final String COMMIT_SNAPSHOT = "commit_snapshot";
+
+ private final DataPersistenceProvider persistence;
+ private final RaftActorContext context;
+ private final RaftActorBehavior currentBehavior;
+ private final RaftActorSnapshotCohort cohort;
+ private final ActorRef raftActorRef;
+ private final Logger log;
+
+ private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+ @Override
+ public void apply(Void notUsed) throws Exception {
+ cohort.createSnapshot(raftActorRef);
+ }
+ };
+
+ RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
+ RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
+ this.persistence = persistence;
+ this.context = context;
+ this.currentBehavior = currentBehavior;
+ this.cohort = cohort;
+ this.raftActorRef = raftActorRef;
+ this.log = context.getLogger();
+ }
+
+ boolean handleSnapshotMessage(Object message) {
+ if(message instanceof ApplySnapshot ) {
+ onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+ return true;
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+ return true;
+ } else if (message instanceof SaveSnapshotFailure) {
+ onSaveSnapshotFailure((SaveSnapshotFailure) message);
+ return true;
+ } else if (message instanceof CaptureSnapshot) {
+ onCaptureSnapshot(message);
+ return true;
+ } else if (message instanceof CaptureSnapshotReply) {
+ onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+ return true;
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ context.getSnapshotManager().commit(persistence, -1);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void onCaptureSnapshotReply(byte[] snapshotBytes) {
+ log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+
+ context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+ }
+
+ private void onCaptureSnapshot(Object message) {
+ log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
+
+ context.getSnapshotManager().create(createSnapshotProcedure);
+ }
+
+ private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
+ log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+ context.getId(), saveSnapshotFailure.cause());
+
+ context.getSnapshotManager().rollback();
+ }
+
+ private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
+ log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
+
+ long sequenceNumber = success.metadata().sequenceNr();
+
+ context.getSnapshotManager().commit(persistence, sequenceNumber);
+ }
+
+ private void onApplySnapshot(Snapshot snapshot) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm());
+ }
+
+ cohort.applySnapshot(snapshot.getState());
+
+ //clears the followers log, sets the snapshot index to ensure adjusted-index works
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
+ currentBehavior));
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ }
+}
* information
*
* @param index the index of the log entry
+ * @return the adjusted index of the first log entry removed or -1 if log entry not found.
*/
- void removeFrom(long index);
+ long removeFrom(long index);
/**
private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
@Override
public void apply(DeleteEntries param) {
- dataSize = 0;
- for (ReplicatedLogEntry entry : journal) {
- dataSize += entry.size();
- }
}
};
@Override
public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
// FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ long adjustedIndex = removeFrom(logEntryIndex);
+ if(adjustedIndex >= 0) {
+ persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure);
+ }
}
@Override
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- journal.add(replicatedLogEntry);
+ append(replicatedLogEntry);
// When persisting events with persist it is guaranteed that the
// persistent actor will not receive further commands between the
public void apply(ReplicatedLogEntry evt) throws Exception {
int logEntrySize = replicatedLogEntry.size();
- dataSize += logEntrySize;
- long dataSizeForCheck = dataSize;
+ long dataSizeForCheck = dataSize();
dataSizeSinceLastSnapshot += logEntrySize;
}
@Override
- protected void createSnapshot() {
+ public void createSnapshot(ActorRef actorRef) {
if(snapshot != null) {
getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
}
}
@Override
- protected void applyRecoverySnapshot(byte[] bytes) {
+ public void applyRecoverySnapshot(byte[] bytes) {
}
void setSnapshot(byte[] snapshot) {
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
}
- @After
- public void tearDown() {
- replicatedLogImpl.journal.clear();
- replicatedLogImpl.setSnapshotIndex(-1);
- replicatedLogImpl.setSnapshotTerm(-1);
- replicatedLogImpl = null;
- }
-
@Test
public void testIndexOperations() {
// now create a snapshot of 3 entries, with 1 unapplied entry left in the log
// It removes the entries which have made it to snapshot
// and updates the snapshot index and term
- Map<Long, String> state = takeSnapshot(3);
+ takeSnapshot(3);
// check the values after the snapshot.
// each index value passed in the test is the logical index (log entry index)
assertEquals(2, replicatedLogImpl.getFrom(6).size());
// take a second snapshot with 5 entries with 0 unapplied entries left in the log
- state = takeSnapshot(5);
+ takeSnapshot(5);
assertEquals(0, replicatedLogImpl.size());
assertNull(replicatedLogImpl.last());
assertTrue(replicatedLogImpl.isPresent(5));
}
+ @Test
+ public void testRemoveFrom() {
+
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2)));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3)));
+
+ assertEquals("dataSize", 9, replicatedLogImpl.dataSize());
+
+ long adjusted = replicatedLogImpl.removeFrom(4);
+ assertEquals("removeFrom - adjusted", 4, adjusted);
+ assertEquals("size", 4, replicatedLogImpl.size());
+ assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+
+ takeSnapshot(1);
+
+ adjusted = replicatedLogImpl.removeFrom(2);
+ assertEquals("removeFrom - adjusted", 1, adjusted);
+ assertEquals("size", 1, replicatedLogImpl.size());
+ assertEquals("dataSize", 1, replicatedLogImpl.dataSize());
+
+ assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0));
+ assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100));
+ }
+
// create a snapshot for test
public Map<Long, String> takeSnapshot(final int numEntries) {
Map<Long, String> map = new HashMap<>(numEntries);
- List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
- for (ReplicatedLogEntry entry : entries) {
+
+ long lastIndex = 0;
+ long lastTerm = 0;
+ for(int i = 0; i < numEntries; i++) {
+ ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i);
map.put(entry.getIndex(), entry.getData().toString());
+ lastIndex = entry.getIndex();
+ lastTerm = entry.getTerm();
}
- int term = (int) replicatedLogImpl.lastTerm();
- int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
- entries.clear();
- replicatedLogImpl.setSnapshotTerm(term);
- replicatedLogImpl.setSnapshotIndex(lastIndex);
+ replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm);
+ replicatedLogImpl.snapshotCommit();
return map;
public void removeFromAndPersist(final long index) {
}
- @Override
- public int dataSize() {
- return -1;
- }
-
- public List<ReplicatedLogEntry> getEntriesTill(final int index) {
- return journal.subList(0, index);
- }
-
@Override
public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
}
this.size = size;
}
- @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
- Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<GeneratedMessage.GeneratedExtension, String>();
+ @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
+ Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
map.put(MockPayloadMessages.value, value);
return map;
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
InMemorySnapshotStore.clear();
}
- public static class MockRaftActor extends RaftActor {
+ public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
- private final RaftActor delegate;
+ private final RaftActor actorDelegate;
+ private final RaftActorRecoveryCohort recoveryCohortDelegate;
+ private final RaftActorSnapshotCohort snapshotCohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
- this.delegate = mock(RaftActor.class);
+ this.actorDelegate = mock(RaftActor.class);
+ this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+ this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
if(dataPersistenceProvider == null){
setPersistence(true);
} else {
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
- delegate.applyState(clientActor, identifier, data);
+ actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called", persistenceId());
}
@Override
- protected void startLogRecoveryBatch(int maxBatchSize) {
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return this;
}
@Override
- protected void appendRecoveredLogEntry(Payload data) {
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return this;
+ }
+
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ public void appendRecoveredLogEntry(Payload data) {
state.add(data);
}
@Override
- protected void applyCurrentLogRecoveryBatch() {
+ public void applyCurrentLogRecoveryBatch() {
}
@Override
protected void onRecoveryComplete() {
- delegate.onRecoveryComplete();
+ actorDelegate.onRecoveryComplete();
recoveryComplete.countDown();
}
}
@Override
- protected void applyRecoverySnapshot(byte[] bytes) {
- delegate.applyRecoverySnapshot(bytes);
+ public void applyRecoverySnapshot(byte[] bytes) {
+ recoveryCohortDelegate.applyRecoverySnapshot(bytes);
try {
Object data = toObject(bytes);
if (data instanceof List) {
}
}
- @Override protected void createSnapshot() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
LOG.info("{}: createSnapshot called", persistenceId());
- delegate.createSnapshot();
+ snapshotCohortDelegate.createSnapshot(actorRef);
}
- @Override protected void applySnapshot(byte [] snapshot) {
+ @Override
+ public void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
- delegate.applySnapshot(snapshot);
+ snapshotCohortDelegate.applySnapshot(snapshot);
}
- @Override protected void onStateChanged() {
- delegate.onStateChanged();
+ @Override
+ protected void onStateChanged() {
+ actorDelegate.onStateChanged();
}
@Override
public ReplicatedLog getReplicatedLog(){
return this.getRaftActorContext().getReplicatedLog();
}
-
}
// add more entries after snapshot is taken
List<ReplicatedLogEntry> entries = new ArrayList<>();
ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
- new MockRaftActorContext.MockPayload("F"));
+ new MockRaftActorContext.MockPayload("F", 2));
ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
- new MockRaftActorContext.MockPayload("G"));
+ new MockRaftActorContext.MockPayload("G", 3));
ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
- new MockRaftActorContext.MockPayload("H"));
+ new MockRaftActorContext.MockPayload("H", 4));
entries.add(entry2);
entries.add(entry3);
entries.add(entry4);
RaftActorContext context = ref.underlyingActor().getRaftActorContext();
assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
context.getReplicatedLog().size());
+ assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+ verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+ verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
- verify(mockRaftActor.delegate).createSnapshot();
+ verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
- verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+ verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
}
};
mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
- verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
+ verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
assertTrue("The replicatedLog should have changed",
oldReplicatedLog != mockRaftActor.getReplicatedLog());
.capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
new MockRaftActorContext.MockPayload("x")), 4);
- verify(leaderActor.delegate).createSnapshot();
+ verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("D")), 4);
- verify(followerActor.delegate).createSnapshot();
+ verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
assertEquals(6, followerActor.getReplicatedLog().size());
import org.opendaylight.controller.sal.binding.test.util.BindingBrokerTestFactory;
import org.opendaylight.controller.sal.binding.test.util.BindingTestContext;
-@SuppressWarnings("deprecation")
public abstract class AbstractDataServiceTest {
protected DataProviderService baDataService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
* FIXME: THis test should be moved to sal-binding-broker and rewriten
* to use new DataBroker API
*/
-@SuppressWarnings("deprecation")
public class ConcurrentImplicitCreateTest extends AbstractDataServiceTest {
private static final TopLevelListKey FOO_KEY = new TopLevelListKey("foo");
private static final TopLevelListKey BAR_KEY = new TopLevelListKey("bar");
- private static InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier.builder(Top.class).build();
- private static InstanceIdentifier<TopLevelList> FOO_PATH = TOP_PATH.child(TopLevelList.class, FOO_KEY);
- private static InstanceIdentifier<TopLevelList> BAR_PATH = TOP_PATH.child(TopLevelList.class, BAR_KEY);
+ private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier.builder(Top.class).build();
+ private static final InstanceIdentifier<TopLevelList> FOO_PATH = TOP_PATH.child(TopLevelList.class, FOO_KEY);
+ private static final InstanceIdentifier<TopLevelList> BAR_PATH = TOP_PATH.child(TopLevelList.class, BAR_KEY);
@Test
public void testConcurrentCreate() throws InterruptedException, ExecutionException {
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
+import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* FIXME: THis test should be moved to compat test-suite
*/
-@SuppressWarnings("deprecation")
public class WildcardedDataChangeListenerTest extends AbstractDataServiceTest {
private static final TopLevelListKey TOP_LEVEL_LIST_0_KEY = new TopLevelListKey("test:0");
package org.opendaylight.controller.sal.binding.test.bugfix;
import static org.junit.Assert.assertFalse;
-
+import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.ExecutionException;
-
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import com.google.common.util.concurrent.SettableFuture;
-
-@SuppressWarnings("deprecation")
public class DeleteNestedAugmentationListenParentTest extends AbstractDataServiceTest {
private static final TopLevelListKey FOO_KEY = new TopLevelListKey("foo");
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
+import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.TimeUnit;
-
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import com.google.common.util.concurrent.SettableFuture;
-
-@SuppressWarnings("deprecation")
public class WriteParentListenAugmentTest extends AbstractDataServiceTest {
private static final String TLL_NAME = "foo";
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
+import com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import com.google.common.collect.ImmutableList;
-
-@SuppressWarnings("deprecation")
public class WriteParentReadChildTest extends AbstractDataServiceTest {
private static final int LIST11_ID = 1234;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-
import java.util.concurrent.Future;
-
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
-/**
- * FIXME: Migrate to use new Data Broker APIs
- */
-@SuppressWarnings("deprecation")
public class BrokerIntegrationTest extends AbstractDataServiceTest {
private static final TopLevelListKey TLL_FOO_KEY = new TopLevelListKey("foo");
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-@SuppressWarnings("deprecation")
public class CrossBrokerMountPointTest {
private static final QName TLL_NAME_QNAME = QName.create(TopLevelList.QNAME, "name");
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
assertNotNull(moduleStream);
final List<InputStream> rpcModels = Collections.singletonList(moduleStream);
- @SuppressWarnings("deprecation")
- final
- Set<Module> modules = parser.parseYangModelsFromStreams(rpcModels);
- @SuppressWarnings("deprecation")
- final
- SchemaContext mountSchemaContext = parser.resolveSchemaContext(modules);
+ final Set<Module> modules = parser.parseYangModelsFromStreams(rpcModels);
+ final SchemaContext mountSchemaContext = parser.resolveSchemaContext(modules);
schemaContext = mountSchemaContext;
}
.child(TopLevelList.class, new TopLevelListKey(mount)).toInstance();
}
- @SuppressWarnings("deprecation")
@Test
public void test() throws ExecutionException, InterruptedException {
// FIXME: This is made to only make sure instance identifier codec
return config;
}
- public static abstract class Builder<T extends Builder>{
+ public static abstract class Builder<T extends Builder<T>> {
protected Map<String, Object> configHolder;
protected Config fallback;
return cachedMailBoxPushTimeout;
}
- public static class Builder<T extends Builder> extends AbstractConfig.Builder<T>{
+ public static class Builder<T extends Builder<T>> extends AbstractConfig.Builder<T>{
public Builder(String actorSystemName) {
super(actorSystemName);
return builder.build();
}
- private NormalizedNode<?, ?> buildDataContainer(DataContainerNodeBuilder builder, NormalizedNodeMessages.Node node){
+ private NormalizedNode<?, ?> buildDataContainer(DataContainerNodeBuilder<?, ?> builder, NormalizedNodeMessages.Node node){
for(NormalizedNodeMessages.Node child : node.getChildList()){
builder.withChild((DataContainerChild<?, ?>) deSerialize(child));
@Override
- public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+ public Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> encode() {
Preconditions.checkState(byteString!=null);
- Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+ Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> map = new HashMap<>();
map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
getModificationInternal());
return map;
this.modification = (PersistentMessages.CompositeModification) Preconditions.checkNotNull(modification, "modification should not be null");
}
- @Override public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+ @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> encode() {
Preconditions.checkState(modification!=null);
- Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+ Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> map = new HashMap<>();
map.put(
org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification, this.modification);
return map;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
*/
public class Shard extends RaftActor {
- private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
-
- private int createSnapshotTransactionCounter;
-
private final ShardCommitCoordinator commitCoordinator;
private long transactionCommitTimeout;
private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
Serialization.serializedActorPath(getSelf()));
+ private final DOMTransactionFactory domTransactionFactory;
- /**
- * Coordinates persistence recovery on startup.
- */
- private ShardRecoveryCoordinator recoveryCoordinator;
+ private final ShardTransactionActorFactory transactionActorFactory;
- private final DOMTransactionFactory transactionFactory;
-
- private final String txnDispatcherPath;
+ private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
this.name = name.toString();
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
- this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
- .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
setPersistence(datastoreContext.isPersistent());
getContext().become(new MeteringBehavior(this));
}
- transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+ domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
- commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
- recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
+ transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+ Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+
+ snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
}
private void setTransactionCommitTimeout() {
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransaction transaction = transactionFactory.newTransaction(
- TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
- transactionChainId);
-
- return createShardTransaction(transaction, transactionId, clientVersion);
- }
-
- private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
- short clientVersion){
- return getContext().actorOf(
- ShardTransaction.props(transaction, getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion)
- .withDispatcher(txnDispatcherPath),
- transactionId.toString());
-
+ return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+ transactionId, transactionChainId, clientVersion);
}
private void createTransaction(CreateTransaction createTransaction) {
return transactionActor;
}
- private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
- throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- commitCohort.preCommit().get();
- commitCohort.commit().get();
- }
-
private void commitWithNewTransaction(final Modification modification) {
DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
modification.apply(tx);
try {
- syncCommitTransaction(tx);
+ snapshotCohort.syncCommitTransaction(tx);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
}
private void updateSchemaContext(final UpdateSchemaContext message) {
- this.schemaContext = message.getSchemaContext();
updateSchemaContext(message.getSchemaContext());
- store.onGlobalContextUpdated(message.getSchemaContext());
}
@VisibleForTesting
}
@Override
- protected
- void startLogRecoveryBatch(final int maxBatchSize) {
- recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
- }
-
- @Override
- protected void appendRecoveredLogEntry(final Payload data) {
- recoveryCoordinator.appendRecoveredLogPayload(data);
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return snapshotCohort;
}
@Override
- protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
- }
-
- @Override
- protected void applyCurrentLogRecoveryBatch() {
- recoveryCoordinator.applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
- recoveryCoordinator = null;
-
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
}
}
- @Override
- protected void createSnapshot() {
- // Create a transaction actor. We are really going to treat the transaction as a worker
- // so that this actor does not get block building the snapshot. THe transaction actor will
- // after processing the CreateSnapshot message.
-
- ActorRef createSnapshotTransaction = createTransaction(
- TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot" + ++createSnapshotTransactionCounter, "",
- DataStoreVersions.CURRENT_VERSION);
-
- createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
- }
-
- @VisibleForTesting
- @Override
- protected void applySnapshot(final byte[] snapshotBytes) {
- // Since this will be done only on Recovery or when this actor is a Follower
- // we can safely commit everything in here. We not need to worry about event notifications
- // as they would have already been disabled on the follower
-
- LOG.info("{}: Applying snapshot", persistenceId());
- try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- // delete everything first
- transaction.delete(DATASTORE_ROOT);
-
- // Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
- } finally {
- LOG.info("{}: Done applying snapshot", persistenceId());
- }
- }
-
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
persistenceId(), getId());
}
- transactionFactory.closeAllTransactionChains();
+ domTransactionFactory.closeAllTransactionChains();
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* @author: syedbahm
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- short clientTxVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super(shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* @author: syedbahm
private final DOMStoreReadWriteTransaction transaction;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- short clientTxVersion) {
- super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
*
* @author Thomas Panetelis
*/
-class ShardRecoveryCoordinator {
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
private final InMemoryDOMDataStore store;
private List<ModificationPayload> currentLogRecoveryBatch;
this.log = log;
}
- void startLogRecoveryBatch(int maxBatchSize) {
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
}
- void appendRecoveredLogPayload(Payload payload) {
+ @Override
+ public void appendRecoveredLogEntry(Payload payload) {
try {
if(payload instanceof ModificationPayload) {
currentLogRecoveryBatch.add((ModificationPayload) payload);
/**
* Applies the current batched log entries to the data store.
*/
- void applyCurrentLogRecoveryBatch() {
+ @Override
+ public void applyCurrentLogRecoveryBatch() {
log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
*
* @param snapshotBytes the serialized snapshot
*/
- void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+ @Override
+ public void applyRecoverySnapshot(final byte[] snapshotBytes) {
log.debug("{}: Applyng recovered sbapshot", shardName);
DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+
+/**
+ * Participates in raft snapshotting on behalf of a Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+
+ private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
+ private int createSnapshotTransactionCounter;
+ private final ShardTransactionActorFactory transactionActorFactory;
+ private final InMemoryDOMDataStore store;
+ private final Logger log;
+ private final String logId;
+
+ ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+ Logger log, String logId) {
+ this.transactionActorFactory = transactionActorFactory;
+ this.store = store;
+ this.log = log;
+ this.logId = logId;
+ }
+
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ // Create a transaction actor. We are really going to treat the transaction as a worker
+ // so that this actor does not get block building the snapshot. THe transaction actor will
+ // after processing the CreateSnapshot message.
+
+ ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
+ "createSnapshot" + ++createSnapshotTransactionCounter);
+
+ ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
+ TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+
+ createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+ }
+
+ @Override
+ public void applySnapshot(byte[] snapshotBytes) {
+ // Since this will be done only on Recovery or when this actor is a Follower
+ // we can safely commit everything in here. We not need to worry about event notifications
+ // as they would have already been disabled on the follower
+
+ log.info("{}: Applying snapshot", logId);
+
+ try {
+ DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+ // delete everything first
+ transaction.delete(DATASTORE_ROOT);
+
+ // Add everything from the remote node back
+ transaction.write(DATASTORE_ROOT, node);
+ syncCommitTransaction(transaction);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("{}: An exception occurred when applying snapshot", logId, e);
+ } finally {
+ log.info("{}: Done applying snapshot", logId);
+ }
+
+ }
+
+ void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+ throws ExecutionException, InterruptedException {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ }
+}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* The ShardTransaction Actor represents a remote transaction
protected static final boolean SERIALIZED_REPLY = true;
private final ActorRef shardActor;
- private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
private final short clientTxVersion;
- protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID, short clientTxVersion) {
+ protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
+ short clientTxVersion) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
- this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
this.clientTxVersion = clientTxVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID, short txnClientVersion) {
- return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+ DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
+ return Props.create(new ShardTransactionCreator(transaction, shardActor,
datastoreContext, shardStats, transactionID, txnClientVersion));
}
return transactionID;
}
- protected SchemaContext getSchemaContext() {
- return schemaContext;
- }
-
protected short getClientTxVersion() {
return clientTxVersion;
}
final DOMStoreTransaction transaction;
final ActorRef shardActor;
- final SchemaContext schemaContext;
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
final short txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID, short txnClientVersion) {
+ DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
- this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
this.transactionID = transactionID;
this.txnClientVersion = txnClientVersion;
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ shardActor, shardStats, transactionID, txnClientVersion);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardStats, transactionID, txnClientVersion);
+ shardStats, transactionID, txnClientVersion);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ shardActor, shardStats, transactionID, txnClientVersion);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
private final DOMStoreTransactionChain chain;
private final DatastoreContext datastoreContext;
- private final SchemaContext schemaContext;
private final ShardStats shardStats;
- public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, ShardStats shardStats) {
+ public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+ ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
this.shardStats = shardStats;
}
TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
- schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
- schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
createTransaction.getVersion()), transactionName);
} else {
throw new IllegalArgumentException (
public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
DatastoreContext datastoreContext, ShardStats shardStats) {
- return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
- datastoreContext, shardStats));
+ return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
}
private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
final DOMStoreTransactionChain chain;
final DatastoreContext datastoreContext;
- final SchemaContext schemaContext;
final ShardStats shardStats;
- ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
- DatastoreContext datastoreContext, ShardStats shardStats) {
+ ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+ ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
this.shardStats = shardStats;
}
@Override
public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
+ return new ShardTransactionChain(chain, datastoreContext, shardStats);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * A factory for creating ShardTransaction actors.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardTransactionActorFactory {
+
+ private final DOMTransactionFactory domTransactionFactory;
+ private final DatastoreContext datastoreContext;
+ private final String txnDispatcherPath;
+ private final ShardStats shardMBean;
+ private final UntypedActorContext actorContext;
+ private final ActorRef shardActor;
+
+ ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+ String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
+ this.domTransactionFactory = domTransactionFactory;
+ this.datastoreContext = datastoreContext;
+ this.txnDispatcherPath = txnDispatcherPath;
+ this.shardMBean = shardMBean;
+ this.actorContext = actorContext;
+ this.shardActor = shardActor;
+ }
+
+ ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
+ String transactionChainID, short clientVersion) {
+
+ DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
+ transactionChainID);
+
+ return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+ transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
+ transactionID.toString());
+ }
+}
/*
- *
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* @author: syedbahm
public class ShardWriteTransaction extends ShardTransaction {
private final MutableCompositeModification compositeModification = new MutableCompositeModification();
+ private int totalBatchedModificationsReceived;
+ private Exception lastBatchedModificationsException;
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- short clientTxVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super(shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
modification.apply(transaction);
}
+ totalBatchedModificationsReceived++;
if(batched.isReady()) {
+ if(lastBatchedModificationsException != null) {
+ throw lastBatchedModificationsException;
+ }
+
+ if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+ throw new IllegalStateException(String.format(
+ "The total number of batched messages received %d does not match the number sent %d",
+ totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+ }
+
readyTransaction(transaction, false);
} else {
getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
}
} catch (Exception e) {
+ lastBatchedModificationsException = e;
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+ if(batched.isReady()) {
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
}
}
/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
+ private int totalBatchedModificationsSent;
protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
}
batchedModifications.setReady(ready);
+ batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
sent = executeOperationAsync(batchedModifications);
if(ready) {
private static final long serialVersionUID = 1L;
private boolean ready;
+ private int totalMessagesSent;
private String transactionID;
private String transactionChainID;
this.ready = ready;
}
+ public int getTotalMessagesSent() {
+ return totalMessagesSent;
+ }
+
+ public void setTotalMessagesSent(int totalMessagesSent) {
+ this.totalMessagesSent = totalMessagesSent;
+ }
+
public String getTransactionID() {
return transactionID;
}
transactionID = in.readUTF();
transactionChainID = in.readUTF();
ready = in.readBoolean();
+ totalMessagesSent = in.readInt();
}
@Override
out.writeUTF(transactionID);
out.writeUTF(transactionChainID);
out.writeBoolean(ready);
+ out.writeInt(totalMessagesSent);
}
@Override
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
- .append(", modifications size=").append(getModifications().size()).append("]");
+ builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
+ .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
+ .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
+ .append("]");
return builder.toString();
}
}
out.write(serializedPayload);
}
- @SuppressWarnings("rawtypes")
@Override
@Deprecated
public <T> Map<GeneratedExtension, T> encode() {
private static final Context NO_OP_CONTEXT = new NoOpContext();
- private final Class expectedMessageClass;
+ private final Class<?> expectedMessageClass;
private final long expectedArrivalInterval;
* @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
* message
*/
- public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
+ public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
this.expectedMessageClass = expectedMessageClass;
this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
}
}
public static class MessageProcessingTime {
- private final Class messageClass;
+ private final Class<?> messageClass;
private final long elapsedTimeInNanos;
- MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
+ MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
this.messageClass = messageClass;
this.elapsedTimeInNanos = elapsedTimeInNanos;
}
'}';
}
- public Class getMessageClass() {
+ public Class<?> getMessageClass() {
return messageClass;
}
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
@SuppressWarnings("serial")
public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+ final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
TestPersistentDataProvider(DataPersistenceProvider delegate) {
dataStoreContextBuilder.persistent(persistent);
new ShardTestKit(getSystem()) {{
- final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
class TestShard extends Shard {
protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
}
@Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
+ public void handleCommand(Object message) {
+ super.handleCommand(message);
+
+ if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+ latch.get().countDown();
+ }
}
@Override
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.actor.Status.Failure;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
short version) {
Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
- testSchemaContext, datastoreContext, shardStats, "txn", version);
+ datastoreContext, shardStats, "txn", version);
return getSystem().actorOf(props, name);
}
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
- batched.setReady(true);
batched.addModification(new WriteModification(writePath, writeData));
transaction.tell(batched, getRef());
+ BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+ assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+ batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+ transaction.tell(batched, getRef());
expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
}};
}
+ @Test(expected=TestException.class)
+ public void testOnReceiveBatchedModificationsFailure() throws Throwable {
+ new JavaTestKit(getSystem()) {{
+
+ DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+ final ActorRef transaction = newTransactionActor(mockWriteTx,
+ "testOnReceiveBatchedModificationsFailure");
+
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doThrow(new TestException()).when(mockWriteTx).write(path, node);
+
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.addModification(new WriteModification(path, node));
+
+ transaction.tell(batched, getRef());
+ expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, getRef());
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ if(failure != null) {
+ throw failure.cause();
+ }
+ }};
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+ new JavaTestKit(getSystem()) {{
+
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
+
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, getRef());
+
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ if(failure != null) {
+ throw failure.cause();
+ }
+ }};
+ }
+
@Test
public void testOnReceivePreLithiumReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn",
- DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
expectMsgClass(duration("3 seconds"), Terminated.class);
}};
}
+
+ public static class TestException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ }
}
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
- verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
- isA(BatchedModifications.class));
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), true,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
}
@Test
verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
new DeleteModification(deletePath2));
+
+ assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
}
@Test
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
batched.setReady(true);
+ batched.setTotalMessagesSent(5);
BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
(Serializable) batched.toSerializable());
assertEquals("getTransactionID", "tx1", clone.getTransactionID());
assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
assertEquals("isReady", true, clone.isReady());
+ assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
assertEquals("getModifications size", 3, clone.getModifications().size());
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
if("messages".equals(message)){
- getSender().tell(new ArrayList(messages), getSelf());
+ getSender().tell(new ArrayList<>(messages), getSelf());
}
} else {
messages.add(message);
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractCheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
* A {@link Future} used to report the status of an future {@link java.util.concurrent.Future}.
*/
final class PingPongFuture extends AbstractCheckedFuture<Void, TransactionCommitFailedException> {
- protected PingPongFuture(final ListenableFuture<Void> delegate) {
- super(delegate);
- }
+ protected PingPongFuture(final ListenableFuture<Void> delegate) {
+ super(delegate);
+ }
- @Override
- protected TransactionCommitFailedException mapException(final Exception e) {
- Preconditions.checkArgument(e instanceof TransactionCommitFailedException);
- return (TransactionCommitFailedException) e;
+ @Override
+ protected TransactionCommitFailedException mapException(final Exception e) {
+ if (e.getCause() instanceof TransactionCommitFailedException){
+ return (TransactionCommitFailedException) e.getCause();
+ } else {
+ return new TransactionCommitFailedException(e.getMessage(), e.getCause(), null);
}
+ }
}
+