(configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
delegatingPersistenceProvider, LOG);
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
}
@Override
context.getConfigParams().getJournalRecoveryLogBatchSize());
super.preStart();
+
+ snapshotSupport = newRaftActorSnapshotMessageSupport();
}
@Override
}
protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
- return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
- getRaftActorRecoveryCohort());
+ return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
}
protected void initializeBehavior(){
@Override
public void handleCommand(Object message) {
- if(snapshotSupport == null) {
- snapshotSupport = newRaftActorSnapshotMessageSupport();
- }
-
- boolean handled = snapshotSupport.handleSnapshotMessage(message);
- if(handled) {
- return;
- }
-
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
);
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
- } else {
+ } else if(!snapshotSupport.handleSnapshotMessage(message)) {
reusableBehaviorStateHolder.init(getCurrentBehavior());
setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
}
protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
- currentBehavior, getRaftActorSnapshotCohort());
+ return new RaftActorSnapshotMessageSupport(context, currentBehavior,
+ getRaftActorSnapshotCohort());
}
private void onGetOnDemandRaftStats() {
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
* @author Thomas Pantelis
*/
class RaftActorRecoverySupport {
- private final DataPersistenceProvider persistence;
private final RaftActorContext context;
private final RaftActorBehavior currentBehavior;
private final RaftActorRecoveryCohort cohort;
private Stopwatch recoveryTimer;
private final Logger log;
- RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
- RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
- this.persistence = persistence;
+ RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+ RaftActorRecoveryCohort cohort) {
this.context = context;
this.currentBehavior = currentBehavior;
this.cohort = cohort;
boolean handleRecoveryMessage(Object message) {
boolean recoveryComplete = false;
- if(persistence.isRecoveryApplicable()) {
+ if(context.getPersistenceProvider().isRecoveryApplicable()) {
if (message instanceof SnapshotOffer) {
onRecoveredSnapshot((SnapshotOffer) message);
} else if (message instanceof ReplicatedLogEntry) {
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
class RaftActorSnapshotMessageSupport {
static final String COMMIT_SNAPSHOT = "commit_snapshot";
- private final DataPersistenceProvider persistence;
private final RaftActorContext context;
private final RaftActorBehavior currentBehavior;
private final RaftActorSnapshotCohort cohort;
}
};
- RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
- RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
- this.persistence = persistence;
+ RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
+ RaftActorSnapshotCohort cohort) {
this.context = context;
this.currentBehavior = currentBehavior;
this.cohort = cohort;
this.log = context.getLogger();
+
+ context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
}
boolean handleSnapshotMessage(Object message) {
} else if (message instanceof SaveSnapshotFailure) {
onSaveSnapshotFailure((SaveSnapshotFailure) message);
return true;
- } else if (message instanceof CaptureSnapshot) {
- onCaptureSnapshot(message);
- return true;
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
return true;
} else if (message.equals(COMMIT_SNAPSHOT)) {
- context.getSnapshotManager().commit(persistence, -1);
+ context.getSnapshotManager().commit(-1);
return true;
} else {
return false;
private void onCaptureSnapshotReply(byte[] snapshotBytes) {
log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
- context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
- }
-
- private void onCaptureSnapshot(Object message) {
- log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
-
- context.getSnapshotManager().create(createSnapshotProcedure);
+ context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
}
private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(persistence, sequenceNumber);
+ context.getSnapshotManager().commit(sequenceNumber);
}
private void onApplySnapshot(Snapshot snapshot) {
cohort.applySnapshot(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
- currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
}
}
import akka.japi.Procedure;
import java.util.Collections;
import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
private long dataSizeSinceLastSnapshot = 0L;
private final RaftActorContext context;
- private final DataPersistenceProvider persistence;
private final RaftActorBehavior currentBehavior;
private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
};
static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
- DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior currentBehavior) {
return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+ snapshot.getUnAppliedEntries(), context, currentBehavior);
}
- static ReplicatedLog newInstance(RaftActorContext context,
- DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ static ReplicatedLog newInstance(RaftActorContext context, RaftActorBehavior currentBehavior) {
return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
- persistence, currentBehavior);
+ currentBehavior);
}
private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
- RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ RaftActorContext context, RaftActorBehavior currentBehavior) {
super(snapshotIndex, snapshotTerm, unAppliedEntries);
this.context = context;
- this.persistence = persistence;
this.currentBehavior = currentBehavior;
}
// FIXME: Maybe this should be done after the command is saved
long adjustedIndex = removeFrom(logEntryIndex);
if(adjustedIndex >= 0) {
- persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
}
}
// persist call and the execution(s) of the associated event
// handler. This also holds for multiple persist calls in context
// of a single command.
- persistence.persist(replicatedLogEntry,
+ context.getPersistenceProvider().persist(replicatedLogEntry,
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public class SnapshotManager implements SnapshotState {
private final SnapshotState IDLE = new Idle();
- private final SnapshotState CAPTURING = new Capturing();
private final SnapshotState PERSISTING = new Persisting();
private final SnapshotState CREATING = new Creating();
private CaptureSnapshot captureSnapshot;
private long lastSequenceNumber = -1;
+ private Procedure<Void> createSnapshotProcedure;
+
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
this.LOG = logger;
}
@Override
- public void create(Procedure<Void> callback) {
- currentState.create(callback);
+ public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(snapshotBytes, currentBehavior, totalMemory);
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior, long totalMemory) {
- currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
- }
-
- @Override
- public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
- currentState.commit(persistenceProvider, sequenceNumber);
+ public void commit(long sequenceNumber) {
+ currentState.commit(sequenceNumber);
}
@Override
return currentState.trimLog(desiredTrimIndex, currentBehavior);
}
+ public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+ this.createSnapshotProcedure = createSnapshotProcedure;
+ }
+
+ @VisibleForTesting
+ public CaptureSnapshot getCaptureSnapshot() {
+ return captureSnapshot;
+ }
+
private boolean hasFollowers(){
return context.getPeerAddresses().keySet().size() > 0;
}
}
@Override
- public void create(Procedure<Void> callback) {
- LOG.debug("create should not be called in state {}", this);
- }
-
- @Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior, long totalMemory) {
+ public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
LOG.debug("persist should not be called in state {}", this);
}
@Override
- public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ public void commit(long sequenceNumber) {
LOG.debug("commit should not be called in state {}", this);
}
lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
- SnapshotManager.this.currentState = CAPTURING;
-
if(captureSnapshot.isInstallSnapshotInitiated()) {
LOG.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
- context.getActor().tell(captureSnapshot, context.getActor());
+ try {
+ createSnapshotProcedure.apply(null);
+ } catch (Exception e) {
+ LOG.error("Error creating snapshot", e);
+ return false;
+ }
+ SnapshotManager.this.currentState = CREATING;
return true;
}
}
}
- private class Capturing extends AbstractSnapshotState {
-
- @Override
- public boolean isCapturing() {
- return true;
- }
-
- @Override
- public void create(Procedure<Void> callback) {
- try {
- callback.apply(null);
- SnapshotManager.this.currentState = CREATING;
- } catch (Exception e) {
- LOG.error("Unexpected error occurred", e);
- }
- }
-
- @Override
- public String toString() {
- return "Capturing";
- }
-
- }
-
private class Creating extends AbstractSnapshotState {
@Override
}
@Override
- public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
- RaftActorBehavior currentBehavior, long totalMemory) {
+ public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
- persistenceProvider.saveSnapshot(sn);
+ context.getPersistenceProvider().saveSnapshot(sn);
LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
private class Persisting extends AbstractSnapshotState {
@Override
- public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ public void commit(long sequenceNumber) {
context.getReplicatedLog().snapshotCommit();
- persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+ context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
- persistenceProvider.deleteMessages(lastSequenceNumber);
+ context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
lastSequenceNumber = -1;
SnapshotManager.this.currentState = IDLE;
package org.opendaylight.controller.cluster.raft;
-import akka.japi.Procedure;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public interface SnapshotState {
*/
boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
- /**
- * Create the snapshot
- *
- * @param callback a procedure to be called which should create the snapshot
- */
- void create(Procedure<Void> callback);
-
/**
* Persist the snapshot
*
- * @param persistenceProvider
* @param snapshotBytes
* @param currentBehavior
* @param totalMemory
*/
- void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
- ,long totalMemory);
+ void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory);
/**
* Commit the snapshot by trimming the log
*
- * @param persistenceProvider
* @param sequenceNumber
*/
- void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+ void commit(long sequenceNumber);
/**
* Rollback the snapshot
public SnapshotManager getSnapshotManager() {
if(this.snapshotManager == null){
this.snapshotManager = new SnapshotManager(this, getLogger());
+ this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
}
return this.snapshotManager;
}
context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
-1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
- support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+ support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
}
private void sendMessageToSupport(Object message) {
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
-import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
}
};
- support = new RaftActorSnapshotMessageSupport(mockPersistence, context, mockBehavior, mockCohort);
+ support = new RaftActorSnapshotMessageSupport(context, mockBehavior, mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
}
private void sendMessageToSupport(Object message) {
verify(mockCohort).applySnapshot(snapshotBytes);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testOnCaptureSnapshot() throws Exception {
-
- sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
-
- ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
- verify(mockSnapshotManager).create(procedure.capture());
-
- procedure.getValue().apply(null);
-
- verify(mockCohort).createSnapshot(same(mockRaftActorRef));
- }
-
@Test
public void testOnCaptureSnapshotReply() {
byte[] snapshot = {1,2,3,4,5};
sendMessageToSupport(new CaptureSnapshotReply(snapshot));
- verify(mockSnapshotManager).persist(same(mockPersistence), same(snapshot), same(mockBehavior), anyLong());
+ verify(mockSnapshotManager).persist(same(snapshot), same(mockBehavior), anyLong());
}
@Test
long sequenceNumber = 100;
sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
- verify(mockSnapshotManager).commit(mockPersistence, sequenceNumber);
+ verify(mockSnapshotManager).commit(sequenceNumber);
}
@Test
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSnapshotManager).commit(mockPersistence, -1);
+ verify(mockSnapshotManager).commit(-1);
}
@Test
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
- , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
+ leader, Runtime.getRuntime().totalMemory());
assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
send2InitialPayloads();
// Block these messages initially so we can control the sequence.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
-
- // First, deliver the CaptureSnapshot to the leader.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
// Send another payload.
MockPayload payload4 = sendPayloadData(leaderActor, "four");
}
@Test
- public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
-
- send2InitialPayloads();
-
- // Block these messages initially so we can control the sequence.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
- follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
-
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
-
- // This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
-
- // Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
-
- MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
-
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
-
- // First, deliver the AppendEntries to the follower
- follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
-
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
- // Now deliver the CaptureSnapshot to the leader.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
- // Wait for snapshot complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
-
- reinstateLeaderActor();
-
- assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
- assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
- assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
- assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
- assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
- assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
-
- // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
- // were included in the snapshot. They were also included as unapplied entries in the snapshot as
- // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
- // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
- // This is a side effect of trimming the persisted log to the sequence number captured at the time
- // the snapshot was initiated.
- assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
- payload3, payload4), leaderActor.underlyingActor().getState());
- }
-
- @Test
- public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+ public void testStatePersistedAfterSnapshotPersisted() {
send2InitialPayloads();
@SuppressWarnings("unchecked")
@Test
public void testAppendAndPersistExpectingNoCapture() throws Exception {
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
}
});
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
int dataSize = 600;
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
@Test
public void testRemoveFromAndPersist() throws Exception {
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
expSnapshotState.add(payload6);
// Delay the CaptureSnapshot message to the leader actor.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
// Send the payload.
payload7 = sendPayloadData(leaderActor, "seven");
- // Capture the CaptureSnapshot message so we can send it later.
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
+ // Capture the CaptureSnapshotReply message so we can send it later.
+ CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+ CaptureSnapshotReply.class);
// Wait for the state to be applied in the leader.
ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
- // Now deliver the CaptureSnapshot.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
- // Wait for CaptureSnapshotReply to complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+ // Now deliver the CaptureSnapshotReply.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+ leaderActor.tell(captureSnapshotReply, leaderActor);
// Wait for snapshot complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
- expSnapshotState.add(payload7);
-
// Verify the persisted snapshot. This should reflect the snapshot index as the last applied
// log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
// when the snapshot is created (ie when the CaptureSnapshot is processed).
assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
+ expSnapshotState.add(payload7);
+
testLog.info("testSecondSnapshot ending");
}
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
+ snapshotManager.setCreateSnapshotCallable(mockProcedure);
}
@After
}
@Test
- public void testCaptureToInstall(){
+ public void testCaptureToInstall() throws Exception {
// Force capturing toInstall = true
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
assertEquals(true, snapshotManager.isCapturing());
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ verify(mockProcedure).apply(null);
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(0L, captureSnapshot.getLastIndex());
}
@Test
- public void testCapture(){
+ public void testCapture() throws Exception {
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(true, snapshotManager.isCapturing());
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ verify(mockProcedure).apply(null);
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(9L, captureSnapshot.getLastIndex());
assertEquals(1L, captureSnapshot.getLastTerm());
}
+ @Test
+ public void testCaptureWithCreateProcedureError () throws Exception {
+ doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertFalse(capture);
+
+ assertEquals(false, snapshotManager.isCapturing());
+
+ verify(mockProcedure).apply(null);
+ }
+
@Test
public void testIllegalCapture() throws Exception {
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
assertTrue(capture);
- List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+ verify(mockProcedure).apply(null);
- assertEquals(1, allMatching.size());
+ reset(mockProcedure);
// This will not cause snapshot capture to start again
capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
assertFalse(capture);
- allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
-
- assertEquals(1, allMatching.size());
+ verify(mockProcedure, never()).apply(null);
}
@Test
// when replicatedToAllIndex = -1
snapshotManager.capture(lastLogEntry, -1);
- snapshotManager.create(mockProcedure);
-
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
}
-
- @Test
- public void testCreate() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure).apply(null);
-
- assertEquals("isCapturing", true, snapshotManager.isCapturing());
- }
-
- @Test
- public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
-
- snapshotManager.create(mockProcedure);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, times(1)).apply(null);
- }
-
- @Test
- public void testCallingCreateBeforeCapture() throws Exception {
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, times(0)).apply(null);
- }
-
- @Test
- public void testCallingCreateAfterPersist() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, times(1)).apply(null);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
-
- reset(mockProcedure);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, never()).apply(null);
- }
-
@Test
public void testPersistWhenReplicatedToAllIndexNotMinus(){
doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), 9);
- snapshotManager.create(mockProcedure);
-
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.create(mockProcedure);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
assertTrue(capture);
- snapshotManager.create(mockProcedure);
-
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
@Test
public void testCallingPersistWithoutCaptureWillDoNothing(){
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
-
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, times(1)).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
assertTrue(raftBehavior instanceof Leader);
- MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
}
@Test
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
- assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
@Test
final YangInstanceIdentifier path = message.getPath();
try {
- Boolean exists = transaction.exists(path).checkedGet();
- DataExistsReply dataExistsReply = new DataExistsReply(exists);
+ boolean exists = transaction.exists(path).checkedGet();
+ DataExistsReply dataExistsReply = DataExistsReply.create(exists);
getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
dataExistsReply, getSelf());
} catch (ReadFailedException e) {
getSender().tell(new akka.actor.Status.Failure(e),getSelf());
}
-
}
private static class ShardTransactionCreator implements Creator<ShardTransaction> {
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class DataExistsReply implements SerializableMessage{
+public class DataExistsReply implements SerializableMessage {
public static final Class<ShardTransactionMessages.DataExistsReply> SERIALIZABLE_CLASS =
ShardTransactionMessages.DataExistsReply.class;
+ private static final DataExistsReply TRUE = new DataExistsReply(true, null);
+ private static final DataExistsReply FALSE = new DataExistsReply(false, null);
+ private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_TRUE =
+ ShardTransactionMessages.DataExistsReply.newBuilder().setExists(true).build();
+ private static final ShardTransactionMessages.DataExistsReply SERIALIZABLE_FALSE =
+ ShardTransactionMessages.DataExistsReply.newBuilder().setExists(false).build();
+
private final boolean exists;
- public DataExistsReply(final boolean exists) {
+ private DataExistsReply(final boolean exists, final Void dummy) {
this.exists = exists;
}
+ /**
+ * @deprecated Use {@link #create(boolean)} instead.
+ * @param exists
+ */
+ @Deprecated
+ public DataExistsReply(final boolean exists) {
+ this(exists, null);
+ }
+
+ public static DataExistsReply create(final boolean exists) {
+ return exists ? TRUE : FALSE;
+ }
+
public boolean exists() {
return exists;
}
- @Override public Object toSerializable() {
- return ShardTransactionMessages.DataExistsReply.newBuilder()
- .setExists(exists).build();
+ @Override
+ public Object toSerializable() {
+ return exists ? SERIALIZABLE_TRUE : SERIALIZABLE_FALSE;
}
- public static DataExistsReply fromSerializable(final Object serializable){
+ public static DataExistsReply fromSerializable(final Object serializable) {
ShardTransactionMessages.DataExistsReply o = (ShardTransactionMessages.DataExistsReply) serializable;
- return new DataExistsReply(o.getExists());
+ return create(o.getExists());
}
-
}
}
protected Future<Object> dataExistsSerializedReply(boolean exists) {
- return Futures.successful(new DataExistsReply(exists).toSerializable());
+ return Futures.successful(DataExistsReply.create(exists).toSerializable());
}
protected Future<DataExistsReply> dataExistsReply(boolean exists) {
- return Futures.successful(new DataExistsReply(exists));
+ return Futures.successful(DataExistsReply.create(exists));
}
protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
return Futures.successful(new BatchedModificationsReply(count));
}
- protected Future<Object> incompleteFuture(){
+ protected Future<Object> incompleteFuture() {
return mock(Future.class);
}
OperationCompleter completer = new OperationCompleter(operationLimiter );
- completer.onComplete(null, new DataExistsReply(true));
+ completer.onComplete(null, DataExistsReply.create(true));
assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
- completer.onComplete(null, new DataExistsReply(true));
+ completer.onComplete(null, DataExistsReply.create(true));
assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
completer.onComplete(null, new IllegalArgumentException());
import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree;
import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode;
import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeSnapshot;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Callback notifying the subclass that the specified registration is being closed and it's user no longer
- * wishes to receive notifications. This notification is invoked while the {@link ListenerRegistration#close()}
+ * wishes to receive notifications. This notification is invoked while the {@link org.opendaylight.yangtools.concepts.ListenerRegistration#close()}
* method is executing. Subclasses can use this callback to properly remove any delayed notifications pending
* towards the registration.
*
}
@Override
- public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+ public final <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
// Take the write lock
takeLock();
try {
}
@Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
- return changePublisher.registerTreeChangeListener(treeId, listener);
+ public synchronized <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+ /*
+ * Make sure commit is not occurring right now. Listener has to be
+ * registered and its state capture enqueued at a consistent point.
+ */
+ return changePublisher.registerTreeChangeListener(treeId, listener, dataTree.takeSnapshot());
}
@Override
return name + "-" + txCounter.getAndIncrement();
}
- private static void warnDebugContext(AbstractDOMStoreTransaction<?> transaction) {
+ private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
final Throwable ctx = transaction.getDebugContext();
if (ctx != null) {
LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import com.google.common.base.Optional;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
- final DataTreeCandidate candidate = new DefaultDataTreeCandidate(path, node);
+ final DataTreeCandidate candidate = DataTreeCandidates.newDataTreeCandidate(path, node);
for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations);
// FIXME: remove the queue for this registration and make sure we clear it
}
+ <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener, final DataTreeSnapshot snapshot) {
+ final AbstractDOMDataTreeChangeListenerRegistration<L> reg = registerTreeChangeListener(treeId, listener);
+
+ final Optional<NormalizedNode<?, ?>> node = snapshot.readNode(treeId);
+ if (node.isPresent()) {
+ final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(treeId, node.get());
+ notificationManager.submitNotification(reg, candidate);
+ }
+
+ return reg;
+ }
+
synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) {
// Runs synchronized with registrationRemoved()
processCandidateTree(candidate);
package org.opendaylight.controller.netconf.cli.reader.custom;
import static org.opendaylight.controller.netconf.cli.io.IOUtil.isSkipInput;
-
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.netconf.cli.reader.ReadingException;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
filterPartsQNames.add(qName);
}
- List<NormalizedNode<?, ?>> previous = readInnerNode(rawValue);
+ List<? extends NormalizedNode<?, ?>> previous = readInnerNode(rawValue);
for (final QName qName : Lists.reverse(filterPartsQNames).subList(1, filterPartsQNames.size())) {
previous = Collections.<NormalizedNode<?, ?>>singletonList(
);
}
- final DataContainerChild<?, ?> newNode = previous == null ? null
- : ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(schemaNode.getQName()))
- .withValue((Collection) previous).build();
+ if (previous == null) {
+ return Collections.singletonList(null);
+ }
+
+ final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> builder = ImmutableContainerNodeBuilder.create();
+ builder.withNodeIdentifier(new NodeIdentifier(schemaNode.getQName()));
+ builder.withValue((Collection<DataContainerChild<?, ?>>) previous);
- return Collections.<NormalizedNode<?, ?>> singletonList(newNode);
+ return Collections.<NormalizedNode<?, ?>> singletonList(builder.build());
}
private List<NormalizedNode<?, ?>> readInnerNode(final String pathString) throws ReadingException {
this.timer = timer;
}
+ protected Timer getTimer() {
+ return timer;
+ }
+
@Override
public Future<NetconfClientSession> createClient(final NetconfClientConfiguration clientConfiguration) {
switch (clientConfiguration.getProtocol()) {
public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
- public static final Set<String> CLIENT_CAPABILITIES = ImmutableSet.of(
+ public static final Set<String> EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+ public static final Set<String> LEGACY_EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+
+ public static final Set<String> DEFAULT_CLIENT_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
+
+ public static final Set<String> LEGACY_FRAMING_CLIENT_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0);
+
private static final Logger LOG = LoggerFactory.getLogger(NetconfClientSessionNegotiatorFactory.class);
private static final String START_EXI_MESSAGE_ID = "default-start-exi";
private static final EXIOptions DEFAULT_OPTIONS;
DEFAULT_OPTIONS = opts;
}
+ private final Set<String> clientCapabilities;
+
public NetconfClientSessionNegotiatorFactory(final Timer timer,
final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
final long connectionTimeoutMillis) {
this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS);
}
+ public NetconfClientSessionNegotiatorFactory(final Timer timer,
+ final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+ final long connectionTimeoutMillis, final Set<String> capabilities) {
+ this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS, capabilities);
+
+ }
+
public NetconfClientSessionNegotiatorFactory(final Timer timer,
final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
final long connectionTimeoutMillis, final EXIOptions exiOptions) {
+ this(timer, additionalHeader, connectionTimeoutMillis, exiOptions, EXI_CLIENT_CAPABILITIES);
+ }
+
+ public NetconfClientSessionNegotiatorFactory(final Timer timer,
+ final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+ final long connectionTimeoutMillis, final EXIOptions exiOptions, final Set<String> capabilities) {
this.timer = Preconditions.checkNotNull(timer);
this.additionalHeader = additionalHeader;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.options = exiOptions;
+ this.clientCapabilities = capabilities;
}
@Override
NetconfMessage startExiMessage = NetconfStartExiMessage.create(options, START_EXI_MESSAGE_ID);
NetconfHelloMessage helloMessage = null;
try {
- helloMessage = NetconfHelloMessage.createClientHello(CLIENT_CAPABILITIES, additionalHeader);
+ helloMessage = NetconfHelloMessage.createClientHello(clientCapabilities, additionalHeader);
} catch (NetconfDocumentedException e) {
- LOG.error("Unable to create client hello message with capabilities {} and additional handler {}",CLIENT_CAPABILITIES,additionalHeader);
+ LOG.error("Unable to create client hello message with capabilities {} and additional handler {}", clientCapabilities,additionalHeader);
throw new IllegalStateException(e);
}
--- /dev/null
+<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+<module>
+<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
+<name>name{MSG_ID}</name>
+<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
+</module>
+</modules>
\ No newline at end of file
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-connector-config</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-netconf-connector</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>logback-config</artifactId>
<shadedClassifierName>executable</shadedClassifierName>
</configuration>
</execution>
+
+ <execution>
+ <id>stress-client</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <shadedArtifactId>stress-client</shadedArtifactId>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.opendaylight.controller.netconf.test.tool.client.stress.StressClient</mainClass>
+ </transformer>
+ </transformers>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>executable</shadedClassifierName>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncExecutionStrategy implements ExecutionStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
+
+ private final Parameters params;
+ private final List<NetconfMessage> preparedMessages;
+ private final NetconfDeviceCommunicator sessionListener;
+ private final List<Integer> editBatches;
+
+ public AsyncExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
+ this.params = params;
+ this.preparedMessages = editConfigMsgs;
+ this.sessionListener = sessionListener;
+ this.editBatches = countEditBatchSizes(params);
+ }
+
+ private static List<Integer> countEditBatchSizes(final Parameters params) {
+ final List<Integer> editBatches = Lists.newArrayList();
+ if (params.editBatchSize != params.editCount) {
+ final int fullBatches = params.editCount / params.editBatchSize;
+ for (int i = 0; i < fullBatches; i++) {
+ editBatches.add(params.editBatchSize);
+ }
+
+ if (params.editCount % params.editBatchSize != 0) {
+ editBatches.add(params.editCount % params.editBatchSize);
+ }
+ } else {
+ editBatches.add(params.editBatchSize);
+ }
+ return editBatches;
+ }
+
+ @Override
+ public void invoke() {
+ final AtomicInteger responseCounter = new AtomicInteger(0);
+ final List<ListenableFuture<RpcResult<NetconfMessage>>> futures = Lists.newArrayList();
+
+ int batchI = 0;
+ for (final Integer editBatch : editBatches) {
+ for (int i = 0; i < editBatch; i++) {
+ final int msgId = i + (batchI * params.editBatchSize);
+ final NetconfMessage msg = preparedMessages.get(msgId);
+ LOG.debug("Sending message {}", msgId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+ }
+ final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+ sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+ futures.add(netconfMessageFuture);
+ }
+ batchI++;
+ LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+ futures.add(sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+ }
+
+ LOG.info("All batches sent. Waiting for responses");
+ // Wait for every future
+ for (final ListenableFuture<RpcResult<NetconfMessage>> future : futures) {
+ try {
+ final RpcResult<NetconfMessage> netconfMessageRpcResult = future.get(params.msgTimeout, TimeUnit.SECONDS);
+ if(netconfMessageRpcResult.isSuccessful()) {
+ responseCounter.incrementAndGet();
+ LOG.debug("Received response {}", responseCounter.get());
+ } else {
+ LOG.warn("Request failed {}", netconfMessageRpcResult);
+ }
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException | TimeoutException e) {
+ throw new RuntimeException("Request not finished", e);
+ }
+ }
+
+ Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.Timer;
+import java.util.Set;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionNegotiatorFactory;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+
+public class ConfigurableClientDispatcher extends NetconfClientDispatcherImpl {
+
+ private final Set<String> capabilities;
+
+ private ConfigurableClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer, final Set<String> capabilities) {
+ super(bossGroup, workerGroup, timer);
+ this.capabilities = capabilities;
+ }
+
+ /**
+ * EXI + chunked framing
+ */
+ public static ConfigurableClientDispatcher createChunkedExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.EXI_CLIENT_CAPABILITIES);
+ }
+
+ /**
+ * EXI + ]]>]]> framing
+ */
+ public static ConfigurableClientDispatcher createLegacyExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_EXI_CLIENT_CAPABILITIES);
+ }
+
+ /**
+ * Chunked framing
+ */
+ public static ConfigurableClientDispatcher createChunked(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.DEFAULT_CLIENT_CAPABILITIES);
+ }
+
+ /**
+ * ]]>]]> framing
+ */
+ public static ConfigurableClientDispatcher createLegacy(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_FRAMING_CLIENT_CAPABILITIES);
+ }
+
+ @Override
+ protected NetconfClientSessionNegotiatorFactory getNegotiatorFactory(final NetconfClientConfiguration cfg) {
+ return new NetconfClientSessionNegotiatorFactory(getTimer(), cfg.getAdditionalHeader(), cfg.getConnectionTimeoutMillis(), capabilities);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+/**
+ * Created by mmarsale on 18.4.2015.
+ */
+public interface ExecutionStrategy {
+ void invoke();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.annotation.Arg;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+
+public class Parameters {
+
+ @Arg(dest = "ip")
+ public String ip;
+
+ @Arg(dest = "port")
+ public int port;
+
+ @Arg(dest = "edit-count")
+ public int editCount;
+
+ @Arg(dest = "edit-content")
+ public File editContent;
+
+ @Arg(dest = "edit-batch-size")
+ public int editBatchSize;
+
+ @Arg(dest = "debug")
+ public boolean debug;
+
+ @Arg(dest = "legacy-framing")
+ public boolean legacyFraming;
+
+ @Arg(dest = "exi")
+ public boolean exi;
+
+ @Arg(dest = "async")
+ public boolean async;
+
+ @Arg(dest = "ssh")
+ public boolean ssh;
+
+ @Arg(dest = "msg-timeout")
+ public long msgTimeout;
+
+ static ArgumentParser getParser() {
+ final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
+
+ parser.description("Netconf stress client");
+
+ parser.addArgument("--ip")
+ .type(String.class)
+ .setDefault("127.0.0.1")
+ .type(String.class)
+ .help("Netconf server IP")
+ .dest("ip");
+
+ parser.addArgument("--port")
+ .type(Integer.class)
+ .setDefault(2830)
+ .type(Integer.class)
+ .help("Netconf server port")
+ .dest("port");
+
+ parser.addArgument("--edits")
+ .type(Integer.class)
+ .setDefault(50000)
+ .type(Integer.class)
+ .help("Netconf edit rpcs to be sent")
+ .dest("edit-count");
+
+ parser.addArgument("--edit-content")
+ .type(File.class)
+ .setDefault(new File("edit.txt"))
+ .type(File.class)
+ .dest("edit-content");
+
+ parser.addArgument("--edit-batch-size")
+ .type(Integer.class)
+ .required(false)
+ .setDefault(-1)
+ .type(Integer.class)
+ .dest("edit-batch-size");
+
+ parser.addArgument("--debug")
+ .type(Boolean.class)
+ .setDefault(false)
+ .help("Whether to use debug log level instead of INFO")
+ .dest("debug");
+
+ parser.addArgument("--legacy-framing")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("legacy-framing");
+
+ parser.addArgument("--exi")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("exi");
+
+ parser.addArgument("--async-requests")
+ .type(Boolean.class)
+ .setDefault(true)
+ .dest("async");
+
+ parser.addArgument("--msg-timeout")
+ .type(Integer.class)
+ .setDefault(60)
+ .dest("msg-timeout");
+
+ parser.addArgument("--ssh")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("ssh");
+
+ // TODO add get-config option instead of edit + commit
+ // TODO different edit config content
+
+ return parser;
+ }
+
+ void validate() {
+ Preconditions.checkArgument(port > 0, "Port =< 0");
+ Preconditions.checkArgument(editCount > 0, "Edit count =< 0");
+ if (editBatchSize == -1) {
+ editBatchSize = editCount;
+ } else {
+ Preconditions.checkArgument(editBatchSize <= editCount, "Edit count =< 0");
+ }
+
+ Preconditions.checkArgument(editContent.exists(), "Edit content file missing");
+ Preconditions.checkArgument(editContent.isDirectory() == false, "Edit content file is a dir");
+ Preconditions.checkArgument(editContent.canRead(), "Edit content file is unreadable");
+ // TODO validate
+ }
+
+ public InetSocketAddress getInetAddress() {
+ try {
+ return new InetSocketAddress(InetAddress.getByName(ip), port);
+ } catch (final UnknownHostException e) {
+ throw new IllegalArgumentException("Unknown ip", e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import ch.qos.logback.classic.Level;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.SAXException;
+
+public final class StressClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StressClient.class);
+
+ static final QName COMMIT_QNAME = QName.create(CommitInput.QNAME, "commit");
+ public static final NetconfMessage COMMIT_MSG;
+
+ static {
+ try {
+ COMMIT_MSG = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"commit-batch\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <commit/>\n" +
+ "</rpc>"));
+ } catch (SAXException | IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ static final QName EDIT_QNAME = QName.create(EditConfigInput.QNAME, "edit-config");
+ static final org.w3c.dom.Document editBlueprint;
+
+ static {
+ try {
+ editBlueprint = XmlUtil.readXmlToDocument(
+ "<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <edit-config xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <target>\n" +
+ " <candidate/>\n" +
+ " </target>\n" +
+ " <config/>\n" +
+ " </edit-config>\n" +
+ "</rpc>");
+ } catch (SAXException | IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}";
+ private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
+
+ public static void main(final String[] args) {
+ final Parameters params = parseArgs(args, Parameters.getParser());
+ params.validate();
+
+ // TODO remove
+ try {
+ Thread.sleep(10000);
+ } catch (final InterruptedException e) {
+// e.printStackTrace();
+ }
+
+ final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+ root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
+
+ LOG.info("Preparing messages");
+ // Prepare all msgs up front
+ final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
+
+ final String editContentString;
+ boolean needsModification = false;
+ try {
+ editContentString = Files.toString(params.editContent, Charsets.UTF_8);
+ if(editContentString.contains(MSG_ID_PLACEHOLDER)) {
+ needsModification = true;
+ };
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Cannot read content of " + params.editContent);
+ }
+
+ for (int i = 0; i < params.editCount; i++) {
+ final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
+ msg.getDocumentElement().setAttribute("message-id", Integer.toString(i));
+ final NetconfMessage netconfMessage = new NetconfMessage(msg);
+
+ final Element editContentElement;
+ try {
+ // Insert message id where needed
+ final String specificEditContent = needsModification ?
+ editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) :
+ editContentString;
+
+ editContentElement = XmlUtil.readXmlToElement(specificEditContent);
+ final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
+ getElementsByTagName("config").item(0);
+ config.appendChild(msg.importNode(editContentElement, true));
+ } catch (final IOException | SAXException e) {
+ throw new IllegalArgumentException("Edit content file is unreadable", e);
+ }
+
+ preparedMessages.add(netconfMessage);
+
+ }
+
+
+ final NioEventLoopGroup nioGroup = new NioEventLoopGroup();
+ final Timer timer = new HashedWheelTimer();
+
+ final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer);
+
+ final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress());
+
+ final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener);
+
+ LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
+ final NetconfClientSession netconfClientSession;
+ try {
+ netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException("Unable to connect", e);
+ }
+
+ LOG.info("Starting stress test");
+ final Stopwatch started = Stopwatch.createStarted();
+ getExecutionStrategy(params, preparedMessages, sessionListener).invoke();
+ started.stop();
+
+ LOG.info("FINISHED. Execution time: {}", started);
+ LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS)));
+
+ // Cleanup
+ netconfClientSession.close();
+ timer.stop();
+ try {
+ nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Unable to close executor properly", e);
+ }
+ }
+
+ private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+ if(params.async) {
+ return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
+ } else {
+ return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+ }
+ }
+
+ private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) {
+ final NetconfClientDispatcherImpl netconfClientDispatcher;
+ if(params.exi) {
+ if(params.legacyFraming) {
+ netconfClientDispatcher= ConfigurableClientDispatcher.createLegacyExi(nioGroup, nioGroup, timer);
+ } else {
+ netconfClientDispatcher = ConfigurableClientDispatcher.createChunkedExi(nioGroup, nioGroup, timer);
+ }
+ } else {
+ if(params.legacyFraming) {
+ netconfClientDispatcher = ConfigurableClientDispatcher.createLegacy(nioGroup, nioGroup, timer);
+ } else {
+ netconfClientDispatcher = ConfigurableClientDispatcher.createChunked(nioGroup, nioGroup, timer);
+ }
+ }
+ return netconfClientDispatcher;
+ }
+
+ private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
+ final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
+ netconfClientConfigurationBuilder.withSessionListener(sessionListener);
+ netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
+ netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
+ netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
+ netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+ return netconfClientConfigurationBuilder.build();
+ }
+
+ static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new LoggingRemoteDevice();
+ return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+ }
+
+ private static Parameters parseArgs(final String[] args, final ArgumentParser parser) {
+ final Parameters opt = new Parameters();
+ try {
+ parser.parseArgs(args, opt);
+ return opt;
+ } catch (final ArgumentParserException e) {
+ parser.handleError(e);
+ }
+
+ System.exit(1);
+ return null;
+ }
+
+
+ private static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+ @Override
+ public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) {
+ LOG.info("Session established");
+ }
+
+ @Override
+ public void onRemoteSessionDown() {
+ LOG.info("Session down");
+ }
+
+ @Override
+ public void onRemoteSessionFailed(final Throwable throwable) {
+ LOG.info("Session failed");
+ }
+
+ @Override
+ public void onNotification(final NetconfMessage notification) {
+ LOG.info("Notification received: {}", notification.toString());
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO reuse code from org.opendaylight.controller.netconf.test.tool.client.stress.AsyncExecutionStrategy
+class SyncExecutionStrategy implements ExecutionStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(SyncExecutionStrategy.class);
+
+ private final Parameters params;
+ private final List<NetconfMessage> preparedMessages;
+ private final NetconfDeviceCommunicator sessionListener;
+ private final List<Integer> editBatches;
+
+ public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+ this.params = params;
+ this.preparedMessages = preparedMessages;
+ this.sessionListener = sessionListener;
+ editBatches = countEditBatchSizes(params);
+ }
+
+ private static List<Integer> countEditBatchSizes(final Parameters params) {
+ final List<Integer> editBatches = Lists.newArrayList();
+ if (params.editBatchSize != params.editCount) {
+ final int fullBatches = params.editCount / params.editBatchSize;
+ for (int i = 0; i < fullBatches; i++) {
+ editBatches.add(params.editBatchSize);
+ }
+
+ if (params.editCount % params.editBatchSize != 0) {
+ editBatches.add(params.editCount % params.editBatchSize);
+ }
+ } else {
+ editBatches.add(params.editBatchSize);
+ }
+ return editBatches;
+ }
+
+ public void invoke() {
+ final AtomicInteger responseCounter = new AtomicInteger(0);
+
+ int batchI = 0;
+ for (final Integer editBatch : editBatches) {
+ for (int i = 0; i < editBatch; i++) {
+ final int msgId = i + (batchI * params.editBatchSize);
+ final NetconfMessage msg = preparedMessages.get(msgId);
+ LOG.debug("Sending message {}", msgId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+ }
+ final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+ sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+ // Wait for response
+ waitForResponse(responseCounter, netconfMessageFuture);
+
+ }
+ batchI++;
+ LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+
+ // Commit batch sync
+ waitForResponse(responseCounter,
+ sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+ }
+
+ Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+ }
+
+ private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {
+ try {
+ final RpcResult<NetconfMessage> netconfMessageRpcResult =
+ netconfMessageFuture.get(params.msgTimeout, TimeUnit.SECONDS);
+ if (netconfMessageRpcResult.isSuccessful()) {
+ responseCounter.incrementAndGet();
+ LOG.debug("Received response {}", responseCounter.get());
+ } else {
+ LOG.warn("Request failed {}", netconfMessageRpcResult);
+ }
+
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException | TimeoutException e) {
+ throw new RuntimeException("Request not finished", e);
+ }
+ }
+}
factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
factory.setXIncludeAware(false);
factory.setExpandEntityReferences(false);
+ // Performance improvement for messages with size <10k according to
+ // https://xerces.apache.org/xerces2-j/faq-performance.html
+ factory.setFeature("http://apache.org/xml/features/dom/defer-node-expansion", false);
} catch (ParserConfigurationException e) {
throw new ExceptionInInitializerError(e);
}
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<sourceDirectory>${project.basedir}</sourceDirectory>
<includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat,**\/*.yang</includes>
- <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java</excludes>
+ <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java, **\/netconf\/test\/tool\/client\/stress\/StressClient.java</excludes>
</configuration>
<dependencies>
<dependency>