<artifactId>karaf-parent</artifactId>
<name>${project.artifactId}</name>
<packaging>pom</packaging>
- <prerequisites>
- <maven>3.1.1</maven>
- </prerequisites>
+
<properties>
<branding.version>1.1.0-SNAPSHOT</branding.version>
<karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
</plugins>
</pluginManagement>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>${enforcer.version}</version>
+ <executions>
+ <execution>
+ <id>enforce-maven</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireMavenVersion>
+ <version>3.1.1</version>
+ </requireMavenVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
</parent>
<artifactId>opendaylight-karaf-empty</artifactId>
<packaging>pom</packaging>
- <prerequisites>
- <maven>3.0</maven>
- </prerequisites>
<dependencies>
<dependency>
</parent>
<artifactId>distribution.opendaylight-karaf</artifactId>
<packaging>pom</packaging>
- <prerequisites>
- <maven>3.0</maven>
- </prerequisites>
<dependencies>
<dependency>
<groupId>org.opendaylight.odlparent</groupId>
<artifactId>features-parent</artifactId>
<version>1.5.0-SNAPSHOT</version>
+ <relativePath/>
</parent>
<groupId>${groupId}</groupId>
<artifactId>${artifactId}-features</artifactId>
-->
<snapshot>
<required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:${artifactId}:impl?module=${artifactId}-impl&revision=2014-12-10</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&revision=2013-10-28</capability>
</required-capabilities>
<configuration>
<module>${artifactId}-features</module>
<module>${artifactId}-artifacts</module>
</modules>
+ <!-- DO NOT install or deploy the repo root pom as it's only needed to initiate a build -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
<artifactId>config-plugin-parent</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<build>
<pluginManagement>
<artifactId>logback-config-loader</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<artifactId>logback-config</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<artifactId>netty-config-api</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
<description>Configuration Wrapper around netty's event executor</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
<description>Configuration Wrapper around netty's event group</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
<description>Configuration Wrapper around netty's timer</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<version>0.3.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
+
<modules>
<module>config-api</module>
<module>config-manager</module>
<artifactId>threadpool-config-api</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<artifactId>threadpool-config-impl</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<name>${project.artifactId}</name>
<description>Remove generated source files, after new files generation, implementation is inserted.</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<name>${project.artifactId}</name>
<description>Artifact that contains only generated code from yang files. Suitable for testing.</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
private CaptureSnapshot captureSnapshot = null;
- private volatile boolean hasSnapshotCaptureInitiated = false;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
-
-
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
if(LOG.isDebugEnabled()) {
- LOG.debug("SnapshotOffer called..");
+ LOG.debug("{}: SnapshotOffer called..", persistenceId());
}
initRecoveryTimer();
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
}
replicatedLog.append(logEntry);
private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getLastApplied() + 1, ale.getToIndex());
+ LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
}
for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
} else if (message instanceof ApplyLogEntries){
ApplyLogEntries ale = (ApplyLogEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
}
persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
if(LOG.isDebugEnabled()) {
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
snapshot.getLastAppliedTerm()
);
}
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("SaveSnapshotSuccess received for snapshot");
+ LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
long sequenceNumber = success.metadata().sequenceNr();
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
- LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+ LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId());
context.getReplicatedLog().snapshotRollback();
- LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+ LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
} else if (message instanceof CaptureSnapshot) {
- LOG.info("CaptureSnapshot received by actor");
+ LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
if(captureSnapshot == null) {
captureSnapshot = (CaptureSnapshot)message;
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message: {}", message.getClass());
+ LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
}
}
context.getTermInformation().getCurrentTerm(), data);
if(LOG.isDebugEnabled()) {
- LOG.debug("Persist data {}", replicatedLogEntry);
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
}
final RaftActorContext raftContext = getRaftActorContext();
self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!hasSnapshotCaptureInitiated){
+ if(!context.isSnapshotCaptureInitiated()){
raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
} else {
- LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+ LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+ persistenceId(), getId());
}
} else if (clientActor != null) {
// Send message for replication
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
- leaderId, peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+ persistenceId(), leaderId, peerAddress);
}
return peerAddress;
}
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+ LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
persistence().saveSnapshot(sn);
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
}
captureSnapshot = null;
- hasSnapshotCaptureInitiated = false;
+ context.setSnapshotCaptureInitiated(false);
}
protected boolean hasFollowers(){
final Procedure<ReplicatedLogEntry> callback) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
// when a snaphsot is being taken, captureSnapshot != null
- if (hasSnapshotCaptureInitiated == false &&
+ if (!context.isSnapshotCaptureInitiated() &&
( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
dataSizeForCheck > dataThreshold)) {
dataSizeSinceLastSnapshot = 0;
- LOG.info("Initiating Snapshot Capture..");
+ LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ",
- context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+ LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+ persistenceId(), context.getLastApplied());
+ LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+ lastAppliedIndex);
+ LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+ lastAppliedTerm);
}
// send a CaptureSnapshot to self to make the expensive operation async.
getSelf().tell(new CaptureSnapshot(
lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
null);
- hasSnapshotCaptureInitiated = true;
+ context.setSnapshotCaptureInitiated(true);
}
if(callback != null){
callback.apply(replicatedLogEntry);
@Override public void update(long currentTerm, String votedFor) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
}
this.currentTerm = currentTerm;
this.votedFor = votedFor;
*
* @param replicatedLog
*/
- public void setReplicatedLog(ReplicatedLog replicatedLog);
+ void setReplicatedLog(ReplicatedLog replicatedLog);
/**
* @return A representation of the log
*
* @param name
*/
- public void removePeer(String name);
+ void removePeer(String name);
/**
* Given a peerId return the corresponding actor
/**
* @return ConfigParams
*/
- public ConfigParams getConfigParams();
+ ConfigParams getConfigParams();
+
+ void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
+
+ boolean isSnapshotCaptureInitiated();
+
}
import akka.actor.Props;
import akka.actor.UntypedActorContext;
import akka.event.LoggingAdapter;
-
import java.util.Map;
import static com.google.common.base.Preconditions.checkState;
private final ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
private Optional<ByteString> snapshot;
+ private long replicatedToAllIndex = -1;
+
public AbstractLeader(RaftActorContext context) {
super(context);
leaderId = context.getId();
- LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+ LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
if(! appendEntriesReply.isSuccess()) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
+ LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
}
}
followerToLog.get(followerId);
if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
+ LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
return this;
}
applyLogToStateMachine(context.getCommitIndex());
}
+ if (!context.isSnapshotCaptureInitiated()) {
+ purgeInMemoryLog();
+ }
+
return this;
}
+ private void purgeInMemoryLog() {
+ //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+ // we would delete the in-mem log from that index on, in-order to minimize mem usage
+ // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+ long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+ for (FollowerLogInformation info : followerToLog.values()) {
+ minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+ }
+
+ replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+ }
+
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
final Iterator<ClientRequestTracker> it = trackerList.iterator();
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+ rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
return switchBehavior(new Follower(context));
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+ if (followerToSnapshot == null) {
+ LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ context.getId(), followerId);
+ return;
+ }
+
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
followerLogInformation.markFollowerActive();
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+ if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
+ LOG.debug("{}: InstallSnapshotReply received, " +
"last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
+ context.getId(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
- followerToLog.get(followerId).getNextIndex());
+ LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+ context.getId(), followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
followerToSnapshot.markSendStatus(true);
}
} else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
+ LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+ context.getId(), reply.getChunkIndex());
followerToSnapshot.markSendStatus(false);
}
-
} else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
+ LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+ context.getId(), reply.getChunkIndex(), followerId,
+ followerToSnapshot.getChunkIndex());
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
+ LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
}
// Create a tracker entry we will use this later to notify the
private void sendAppendEntries() {
// Send an AppendEntries to all followers
+
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
- if (mapFollowerToSnapshot.get(followerId) != null) {
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ if (followerToSnapshot != null) {
// if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
sendSnapshotChunk(followerActor, followerId);
} else {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
}
} else {
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
final List<ReplicatedLogEntry> entries;
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
+ LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ context.getId(), leaderLastIndex, leaderSnapShotIndex);
+
+ if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+ LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
+ followerNextIndex, followerId);
+
// FIXME : Sending one entry at a time
entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
// then snapshot should be sent
if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ "follower-nextIndex: %s, leader-snapshot-index: %s, " +
+ "leader-last-index: %s", context.getId(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
}
actor().tell(new InitiateInstallSnapshot(), actor());
entries = Collections.<ReplicatedLogEntry>emptyList();
}
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
}
}
}
}
private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
+ List<ReplicatedLogEntry> entries, String followerId) {
+ AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex(), replicatedToAllIndex);
+
+ if(!entries.isEmpty()) {
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+ appendEntries);
+ }
+
+ followerActor.tell(appendEntries.toSerializable(), actor());
}
/**
*
*/
private void installSnapshotIfNeeded() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+ }
+
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", e.getKey());
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
// on every install snapshot, we try to capture the snapshot.
// Once a capture is going on, another one issued will get ignored by RaftActor.
private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
+ ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+ // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+ // followerId to the followerToSnapshot map.
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks(),
- Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+ nextSnapshotChunk,
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
+ Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
);
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ context.getId(), followerActor.path(),
+ followerToSnapshot.getChunkIndex(),
+ followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
+ LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
}
}
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
}
return nextChunk;
}
* snapshot chunks
*/
protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
+ private final ByteString snapshotBytes;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
private int chunkIndex;
- private int totalChunks;
+ private final int totalChunks;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+ context.getId(), size, totalChunks);
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
+ LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
snapshotLength, start, size);
}
ByteString substring = getSnapshotBytes().substring(start, start + size);
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Cannot append entries because sender term {} is less than {}",
- appendEntries.getTerm(), currentTerm());
+ LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
+ context.getId(), appendEntries.getTerm(), currentTerm());
}
sender.tell(
RequestVote requestVote) {
if(LOG.isDebugEnabled()) {
- LOG.debug(requestVote.toString());
+ LOG.debug("{}: Received {}", context.getId(), requestVote);
}
boolean grantVote = false;
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
LOG.warning(
- "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
+ "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ context.getId(), i, i, index);
break;
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Setting last applied to {}", newLastApplied);
+ LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
}
context.setLastApplied(newLastApplied);
try {
close();
} catch (Exception e) {
- LOG.error(e, "Failed to close behavior : {}", this.state());
+ LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
}
return behavior;
return numMajority;
}
+
+ protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long tempMin = Math.min(minReplicatedToAllIndex,
+ (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ }
+ return currentReplicatedIndex;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import java.util.Set;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.Set;
-
/**
* The behavior of a RaftActor when it is in the CandidateState
* <p/>
peers = context.getPeerAddresses().keySet();
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Candidate has following peers: {}", peers);
+ LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
}
votesRequired = getMajorityVoteCount(peers.size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
RaftRPC rpc = (RaftRPC) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+ LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+ context.getTermInformation().getCurrentTerm());
}
// If RPC request or response contains term T > currentTerm:
context.getId());
if(LOG.isDebugEnabled()) {
- LOG.debug("Starting new term {}", (currentTerm + 1));
+ LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
}
// Request for a vote
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
}
// it's log.
if(LOG.isDebugEnabled()) {
- LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
- LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , prevLogTerm
+ "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , context.getId(), prevLogTerm
, appendEntries.getPrevLogTerm());
}
} else {
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
- LOG.debug("Follower ({}) is out-of-sync, " +
+ LOG.debug("{}: Follower ({}) is out-of-sync, " +
"so sending negative reply, lastIndex():{}, lastTerm():{}",
- context.getId(), lastIndex(), lastTerm()
+ context.getId(), context.getId(), lastIndex(), lastTerm()
);
}
sender.tell(
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Number of entries to be appended = {}", appendEntries.getEntries().size()
- );
+ LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+ appendEntries.getEntries().size());
}
// 3. If an existing entry conflicts with a new one (same index
}
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Removing entries from log starting at {}", matchEntry.getIndex()
- );
+ LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+ matchEntry.getIndex());
}
// Entries do not match so remove all subsequent entries
}
if(LOG.isDebugEnabled()) {
- LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
- );
+ LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+ (addEntriesFrom + lastIndex()));
}
// 4. Append any new entries not already in the log
i < appendEntries.getEntries().size(); i++) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ LOG.debug("{}: Append entry to log {}", context.getId(),
+ appendEntries.getEntries().get(i).getData());
}
context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+ LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
}
}
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to {}", context.getCommitIndex());
+ LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
}
}
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("applyLogToStateMachine, " +
+ LOG.debug("{}: applyLogToStateMachine, " +
"appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
+ "context.getLastApplied():{}, lastIndex():{}", context.getId(),
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
);
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
+ if (!context.isSnapshotCaptureInitiated()) {
+ fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+ }
+
return this;
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ LOG.debug("{}: InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
);
}
snapshotTracker = null;
} catch (Exception e){
-
- LOG.error(e, "Exception in InstallSnapshot of follower:");
+ LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
- LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
- minIsolatedLeaderPeerCount, leaderId);
+ LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
return switchBehavior(new IsolatedLeader(context));
}
}
// leader's commitIndex
private final long leaderCommit;
+ // index which has been replicated successfully to all followers, -1 if none
+ private final long replicatedToAllIndex;
+
public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+ long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.leaderCommit = leaderCommit;
+ this.replicatedToAllIndex = replicatedToAllIndex;
}
private void writeObject(ObjectOutputStream out) throws IOException {
return leaderCommit;
}
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
@Override
public String toString() {
final StringBuilder sb =
sb.append(", prevLogTerm=").append(prevLogTerm);
sb.append(", entries=").append(entries);
sb.append(", leaderCommit=").append(leaderCommit);
+ sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
sb.append('}');
return sb.toString();
}
from.getPrevLogIndex(),
from.getPrevLogTerm(),
logEntryList,
- from.getLeaderCommit());
+ from.getLeaderCommit(), -1);
return to;
}
}
+ @Test
+ public void testSnapshotPreCommit() {
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+ replicatedLogImpl.snapshotPreCommit(4, 3);
+ assertEquals(3, replicatedLogImpl.size());
+ assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+
+ replicatedLogImpl.snapshotPreCommit(6, 3);
+ assertEquals(1, replicatedLogImpl.size());
+ assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+
+ replicatedLogImpl.snapshotPreCommit(7, 3);
+ assertEquals(0, replicatedLogImpl.size());
+ assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+ //running it again on an empty list should not throw exception
+ replicatedLogImpl.snapshotPreCommit(7, 3);
+ assertEquals(0, replicatedLogImpl.size());
+ assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+
+ }
+
// create a snapshot for test
public Map<Long, String> takeSnapshot(final int numEntries) {
Map<Long, String> map = new HashMap<>(numEntries);
private ReplicatedLog replicatedLog;
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
+ private boolean snapshotCaptureInitiated;
public MockRaftActorContext(){
electionTerm = null;
return configParams;
}
+ @Override
+ public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+ this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return snapshotCaptureInitiated;
+ }
+
public void setConfigParams(ConfigParams configParams) {
this.configParams = configParams;
}
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class RaftActorTest extends AbstractActorTest {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
+ private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
private static final long serialVersionUID = 1L;
}
}
- public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+ public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+ DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
this.delegate = mock(RaftActor.class);
}
}
+ public void waitForInitializeBehaviorComplete() {
+ try {
+ assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
public List<Object> getState() {
return state;
}
recoveryComplete.countDown();
}
+ @Override
+ protected void initializeBehavior() {
+ super.initializeBehavior();
+ initializeBehaviorComplete.countDown();
+ }
+
@Override
protected void applyRecoverySnapshot(byte[] bytes) {
delegate.applyRecoverySnapshot(bytes);
// 4 messages as part of snapshot, which are applied to state
ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
+ new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
}};
}
+ @Test
+ public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "leader1";
+
+ ActorRef followerActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(4);
+ leaderActor.getRaftActorContext().setLastApplied(4);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
+ leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+ verify(leaderActor.delegate).createSnapshot();
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ //fake snapshot on index 5
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ //fake snapshot on index 6
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ assertEquals(8, leaderActor.getReplicatedLog().size());
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, leaderActor.getReplicatedLog().size());
+ assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+
+ // add another non-replicated entry
+ leaderActor.getReplicatedLog().append(
+ new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+
+ //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
+ assertEquals(2, leaderActor.getReplicatedLog().size());
+ assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+ @Test
+ public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "follower1";
+
+ ActorRef leaderActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("leader", leaderActor1.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor followerActor = mockActorRef.underlyingActor();
+ followerActor.getRaftActorContext().setCommitIndex(4);
+ followerActor.getRaftActorContext().setLastApplied(4);
+ followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ followerActor.waitForInitializeBehaviorComplete();
+
+ // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+ Follower follower = new Follower(followerActor.getRaftActorContext());
+ followerActor.setCurrentBehavior(follower);
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+
+ // log as indices 0-5
+ assertEquals(6, followerActor.getReplicatedLog().size());
+
+ //snapshot on 4
+ followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
+ followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+ verify(followerActor.delegate).createSnapshot();
+
+ assertEquals(6, followerActor.getReplicatedLog().size());
+
+ //fake snapshot on index 6
+ List<ReplicatedLogEntry> entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("foo-6"))
+ );
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
+ assertEquals(7, followerActor.getReplicatedLog().size());
+
+ //fake snapshot on index 7
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+ new MockRaftActorContext.MockPayload("foo-7"))
+ );
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
+ assertEquals(8, followerActor.getReplicatedLog().size());
+
+ assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ // capture snapshot reply should remove the snapshotted entries only
+ assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+ assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+
+ entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+ new MockRaftActorContext.MockPayload("foo-7"))
+ );
+ // send an additional entry 8 with leaderCommit = 7
+ followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
+
+ // 7 and 8, as lastapplied is 7
+ assertEquals(2, followerActor.getReplicatedLog().size());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
context.getTermInformation().update(1000, "test");
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
RaftActorBehavior behavior = createBehavior(context);
}};
}
+ @Test
+ public void testFakeSnapshots() {
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+ AbstractRaftActorBehavior behavior = new Leader(context);
+ context.getTermInformation().update(1, "leader");
+
+ //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ context.setLastApplied(0);
+ assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setLastApplied(0);
+ assertEquals(-1, behavior.fakeSnapshot(0, -1));
+ assertEquals(2, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+ context.setLastApplied(1);
+ assertEquals(0, behavior.fakeSnapshot(0, -1));
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+ context.setLastApplied(2);
+ assertEquals(1, behavior.fakeSnapshot(3, 1));
+ assertEquals(3, context.getReplicatedLog().size());
+
+
+ }
+
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
ActorRef actorRef, RaftRPC rpc) {
}
protected AppendEntries createAppendEntriesWithNewerTerm() {
- return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+
import static org.junit.Assert.assertEquals;
public class CandidateTest extends AbstractRaftActorBehaviorTest {
Candidate candidate = new Candidate(createActorContext(getTestActor()));
- candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0));
+ candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
// do not put code outside this method, will run afterwards
// The new commitIndex is 101
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+ new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
RaftActorBehavior raftBehavior =
createBehavior(context).handleMessage(getRef(), appendEntries);
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+ new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
RaftActorBehavior behavior = createBehavior(context);
*/
package org.opendaylight.controller.cluster.raft.messages;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
/**
* Unit tests for AppendEntries.
*
ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
- AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+ AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
@Test
public void testToAndFromSerializable() {
AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
- Collections.<ReplicatedLogEntry>emptyList(), 10L);
+ Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
assertSame("toSerializable", entries, entries.toSerializable());
assertSame("fromSerializable", entries,
@Test
public void testToAndFromLegacySerializable() {
ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
- AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+ AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+ assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
- <groupId>org.opendaylight.controller.model</groupId>
- <artifactId>model-flow-service</artifactId>
- <scope>provided</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-test-model</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>opendaylight-l2-types</artifactId>
</dependency>
</dependencies>
}
+ /**
+ * @return option containing models for testing purposes
+ */
+ public static Option salTestModelBundles() {
+ return new DefaultCompositeOption( //
+ mavenBundle(CONTROLLER, "sal-test-model").versionAsInProject()
+ );
+
+ }
+
public static Option baseModelBundles() {
return new DefaultCompositeOption( //
mavenBundle(YANGTOOLS_MODELS, "yang-ext").versionAsInProject(), // //
mavenBundle(YANGTOOLS_MODELS, "ietf-inet-types").versionAsInProject(), // //
mavenBundle(YANGTOOLS_MODELS, "ietf-yang-types").versionAsInProject(), // //
- mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject(), // //
- mavenBundle(CONTROLLER_MODELS, "model-inventory").versionAsInProject());
+ mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject() // //
+ );
}
public static Option junitAndMockitoBundles() {
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.flowCapableModelBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.salTestModelBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.junitAndMockitoBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
configMinumumBundles(),
// BASE Models
baseModelBundles(),
- flowCapableModelBundles(),
+ salTestModelBundles(),
// Set fail if unresolved bundle present
systemProperty("pax.exam.osgi.unresolved.fail").value("true"),
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import com.google.inject.Inject;
import java.util.concurrent.Future;
-import org.junit.Before;
-import org.junit.Ignore;
+
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.inject.Inject;
+
+/**
+ * covers creating, reading and deleting of an item in dataStore
+ */
public class DataServiceTest extends AbstractTest {
protected DataBrokerService consumerDataService;
-
@Inject
Broker broker2;
- @Before
- public void setUp() throws Exception {
- }
-
- /*
+ /**
*
* Ignored this, because classes here are constructed from
* very different class loader as MD-SAL is run into,
* this is code is run from different classloader.
*
+ * @throws Exception
*/
@Test
- @Ignore
public void test() throws Exception {
BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
consumerDataService = session.getSALService(DataBrokerService.class);
}
};
- broker.registerConsumer(consumer1, getBundleContext());
+ broker.registerConsumer(consumer1);
assertNotNull(consumerDataService);
DataModificationTransaction transaction = consumerDataService.beginTransaction();
assertNotNull(transaction);
- InstanceIdentifier<Node> node1 = createNodeRef("0");
- DataObject node = consumerDataService.readConfigurationData(node1);
+ InstanceIdentifier<UnorderedList> node1 = createNodeRef("0");
+ DataObject node = consumerDataService.readConfigurationData(node1);
assertNull(node);
- Node nodeData1 = createNode("0");
+ UnorderedList nodeData1 = createNode("0");
transaction.putConfigurationData(node1, nodeData1);
Future<RpcResult<TransactionStatus>> commitResult = transaction.commit();
assertNotNull(result.getResult());
assertEquals(TransactionStatus.COMMITED, result.getResult());
- Node readedData = (Node) consumerDataService.readConfigurationData(node1);
+ UnorderedList readedData = (UnorderedList) consumerDataService.readConfigurationData(node1);
assertNotNull(readedData);
assertEquals(nodeData1.getKey(), readedData.getKey());
DataModificationTransaction transaction2 = consumerDataService.beginTransaction();
- assertNotNull(transaction);
+ assertNotNull(transaction2);
transaction2.removeConfigurationData(node1);
DataObject readedData2 = consumerDataService.readConfigurationData(node1);
assertNull(readedData2);
-
-
}
- private static InstanceIdentifier<Node> createNodeRef(final String string) {
- NodeKey key = new NodeKey(new NodeId(string));
- return InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
+ private static InstanceIdentifier<UnorderedList> createNodeRef(final String string) {
+ UnorderedListKey key = new UnorderedListKey(string);
+ return InstanceIdentifier.builder(Lists.class).child(UnorderedContainer.class).child(UnorderedList.class, key).build();
}
- private static Node createNode(final String string) {
- NodeBuilder ret = new NodeBuilder();
- NodeId id = new NodeId(string);
- ret.setKey(new NodeKey(id));
- ret.setId(id);
+ private static UnorderedList createNode(final String string) {
+ UnorderedListBuilder ret = new UnorderedListBuilder();
+ UnorderedListKey nodeKey = new UnorderedListKey(string);
+ ret.setKey(nodeKey);
+ ret.setName("name of " + string);
+ ret.setName("value of " + string);
return ret.build();
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.NotificationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OpendaylightTestNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotificationBuilder;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-@Ignore
+/**
+ * covers registering of notification listener, publishing of notification and receiving of notification.
+ */
public class NotificationTest extends AbstractTest {
- private final FlowListener listener1 = new FlowListener();
- private final FlowListener listener2 = new FlowListener();
+ private static final Logger LOG = LoggerFactory
+ .getLogger(NotificationTest.class);
- private ListenerRegistration<NotificationListener> listener1Reg;
- private ListenerRegistration<NotificationListener> listener2Reg;
+ protected final NotificationTestListener listener1 = new NotificationTestListener();
+ protected final NotificationTestListener listener2 = new NotificationTestListener();
- private NotificationProviderService notifyProviderService;
+ protected ListenerRegistration<NotificationListener> listener1Reg;
+ protected ListenerRegistration<NotificationListener> listener2Reg;
- @Before
- public void setUp() throws Exception {
- }
+ protected NotificationProviderService notifyProviderService;
+ /**
+ * test of delivering of notification
+ * @throws Exception
+ */
@Test
public void notificationTest() throws Exception {
- /**
- *
- * The registration of the Provider 1.
- *
- */
+ LOG.info("The registration of the Provider 1.");
AbstractTestProvider provider1 = new AbstractTestProvider() {
@Override
public void onSessionInitiated(ProviderContext session) {
};
// registerProvider method calls onSessionInitiated method above
- broker.registerProvider(provider1, getBundleContext());
+ broker.registerProvider(provider1);
assertNotNull(notifyProviderService);
- /**
- *
- * The registration of the Consumer 1. It retrieves Notification Service
- * from MD-SAL and registers SalFlowListener as notification listener
- *
- */
+ LOG.info("The registration of the Consumer 1. It retrieves Notification Service "
+ + "from MD-SAL and registers OpendaylightTestNotificationListener as notification listener");
BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
@Override
public void onSessionInitialized(ConsumerContext session) {
}
};
// registerConsumer method calls onSessionInitialized method above
- broker.registerConsumer(consumer1, getBundleContext());
+ broker.registerConsumer(consumer1);
assertNotNull(listener1Reg);
- /**
- * The notification of type FlowAdded with cookie ID 0 is created. The
- * delay 100ms to make sure that the notification was delivered to
- * listener.
- */
- notifyProviderService.publish(flowAdded(0));
+ LOG.info("The notification of type FlowAdded with cookie ID 0 is created. The "
+ + "delay 100ms to make sure that the notification was delivered to "
+ + "listener.");
+ notifyProviderService.publish(noDustNotification("rainy day", 42));
Thread.sleep(100);
/**
* Check that one notification was delivered and has correct cookie.
*
*/
- assertEquals(1, listener1.addedFlows.size());
- assertEquals(0, listener1.addedFlows.get(0).getCookie().getValue().intValue());
+ assertEquals(1, listener1.notificationBag.size());
+ assertEquals("rainy day", listener1.notificationBag.get(0).getReason());
+ assertEquals(42, listener1.notificationBag.get(0).getDaysTillNewDust().intValue());
- /**
- * The registration of the Consumer 2. SalFlowListener is registered
- * registered as notification listener.
- */
+ LOG.info("The registration of the Consumer 2. SalFlowListener is registered "
+ + "registered as notification listener.");
BindingAwareProvider provider = new BindingAwareProvider() {
@Override
};
// registerConsumer method calls onSessionInitialized method above
- broker.registerProvider(provider, getBundleContext());
+ broker.registerProvider(provider);
- /**
- * 3 notifications are published
- */
- notifyProviderService.publish(flowAdded(5));
- notifyProviderService.publish(flowAdded(10));
- notifyProviderService.publish(flowAdded(2));
+ LOG.info("3 notifications are published");
+ notifyProviderService.publish(noDustNotification("rainy day", 5));
+ notifyProviderService.publish(noDustNotification("rainy day", 10));
+ notifyProviderService.publish(noDustNotification("tax collector", 2));
/**
* The delay 100ms to make sure that the notifications were delivered to
* received 4 in total, second 3 in total).
*
*/
- assertEquals(4, listener1.addedFlows.size());
- assertEquals(3, listener2.addedFlows.size());
+ assertEquals(4, listener1.notificationBag.size());
+ assertEquals(3, listener2.notificationBag.size());
/**
* The second listener is closed (unregistered)
*/
listener2Reg.close();
- /**
- *
- * The notification 5 is published
- */
- notifyProviderService.publish(flowAdded(10));
+ LOG.info("The notification 5 is published");
+ notifyProviderService.publish(noDustNotification("entomologist hunt", 10));
/**
* The delay 100ms to make sure that the notification was delivered to
* second consumer because its listener was unregistered.
*
*/
- assertEquals(5, listener1.addedFlows.size());
- assertEquals(3, listener2.addedFlows.size());
+ assertEquals(5, listener1.notificationBag.size());
+ assertEquals(3, listener2.notificationBag.size());
}
/**
- * Creates instance of the type FlowAdded. Only cookie value is set. It is
+ * Creates instance of the type OutOfPixieDustNotification. It is
* used only for testing purpose.
*
- * @param i
- * cookie value
- * @return instance of the type FlowAdded
+ * @param reason
+ * @param days
+ * @return instance of the type OutOfPixieDustNotification
*/
- public static FlowAdded flowAdded(int i) {
- FlowAddedBuilder ret = new FlowAddedBuilder();
- ret.setCookie(new FlowCookie(BigInteger.valueOf(i)));
+ public static OutOfPixieDustNotification noDustNotification(String reason, int days) {
+ OutOfPixieDustNotificationBuilder ret = new OutOfPixieDustNotificationBuilder();
+ ret.setReason(reason).setDaysTillNewDust(days);
return ret.build();
}
/**
*
* Implements
- * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
- * SalFlowListener} and contains attributes which keep lists of objects of
- * the type
- * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819. NodeFlow
- * NodeFlow}. The lists are defined for flows which were added, removed or
- * updated.
+ * {@link OpendaylightTestNotificationListener} and contains attributes which keep lists of objects of
+ * the type {@link OutOfFairyDustNotification}.
*/
- private static class FlowListener implements SalFlowListener {
-
- List<FlowAdded> addedFlows = new ArrayList<>();
- List<FlowRemoved> removedFlows = new ArrayList<>();
- List<FlowUpdated> updatedFlows = new ArrayList<>();
-
- @Override
- public void onFlowAdded(FlowAdded notification) {
- addedFlows.add(notification);
- }
-
- @Override
- public void onFlowRemoved(FlowRemoved notification) {
- removedFlows.add(notification);
- };
-
- @Override
- public void onFlowUpdated(FlowUpdated notification) {
- updatedFlows.add(notification);
- }
-
- @Override
- public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
- // TODO Auto-generated method stub
-
- }
+ public static class NotificationTestListener implements OpendaylightTestNotificationListener {
- @Override
- public void onNodeErrorNotification(NodeErrorNotification notification) {
- // TODO Auto-generated method stub
-
- }
+ List<OutOfPixieDustNotification> notificationBag = new ArrayList<>();
@Override
- public void onNodeExperimenterErrorNotification(
- NodeExperimenterErrorNotification notification) {
- // TODO Auto-generated method stub
-
+ public void onOutOfPixieDustNotification(OutOfPixieDustNotification arg0) {
+ notificationBag.add(arg0);
}
}
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import java.math.BigInteger;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.OpendaylightTestRoutedRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.TestContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * covers routed rpc creation, registration, invocation, unregistration
+ */
public class RoutedServiceTest extends AbstractTest {
- private SalFlowService salFlowService1;
- private SalFlowService salFlowService2;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(RoutedServiceTest.class);
- private SalFlowService consumerService;
+ protected OpendaylightTestRoutedRpcService odlRoutedService1;
+ protected OpendaylightTestRoutedRpcService odlRoutedService2;
- private RoutedRpcRegistration<SalFlowService> firstReg;
- private RoutedRpcRegistration<SalFlowService> secondReg;
+ protected OpendaylightTestRoutedRpcService consumerService;
+ protected RoutedRpcRegistration<OpendaylightTestRoutedRpcService> firstReg;
+ protected RoutedRpcRegistration<OpendaylightTestRoutedRpcService> secondReg;
+
+ /**
+ * prepare mocks
+ */
@Before
- public void setUp() throws Exception {
- salFlowService1 = mock(SalFlowService.class, "First Flow Service");
- salFlowService2 = mock(SalFlowService.class, "Second Flow Service");
+ public void setUp() {
+ odlRoutedService1 = mock(OpendaylightTestRoutedRpcService.class, "First Flow Service");
+ odlRoutedService2 = mock(OpendaylightTestRoutedRpcService.class, "Second Flow Service");
}
@Test
assertNotNull(getBroker());
BindingAwareProvider provider1 = new AbstractTestProvider() {
-
@Override
public void onSessionInitiated(ProviderContext session) {
assertNotNull(session);
- firstReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService1);
+ firstReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService1);
}
};
- /**
- * Register provider 1 with first implementation of SalFlowService -
- * service1
- *
- */
- broker.registerProvider(provider1, getBundleContext());
+ LOG.info("Register provider 1 with first implementation of routeSimpleService - service1");
+ broker.registerProvider(provider1);
assertNotNull("Registration should not be null", firstReg);
- assertSame(salFlowService1, firstReg.getInstance());
+ assertSame(odlRoutedService1, firstReg.getInstance());
BindingAwareProvider provider2 = new AbstractTestProvider() {
-
@Override
public void onSessionInitiated(ProviderContext session) {
assertNotNull(session);
- secondReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService2);
+ secondReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService2);
}
};
- /**
- * Register provider 2 with first implementation of SalFlowService -
- * service2
- *
- */
- broker.registerProvider(provider2, getBundleContext());
+ LOG.info("Register provider 2 with second implementation of routeSimpleService - service2");
+ broker.registerProvider(provider2);
assertNotNull("Registration should not be null", firstReg);
- assertSame(salFlowService2, secondReg.getInstance());
+ assertSame(odlRoutedService2, secondReg.getInstance());
assertNotSame(secondReg, firstReg);
BindingAwareConsumer consumer = new BindingAwareConsumer() {
@Override
public void onSessionInitialized(ConsumerContext session) {
- consumerService = session.getRpcService(SalFlowService.class);
+ consumerService = session.getRpcService(OpendaylightTestRoutedRpcService.class);
}
};
- broker.registerConsumer(consumer, getBundleContext());
+ LOG.info("Register routeService consumer");
+ broker.registerConsumer(consumer);
- assertNotNull("MD-SAL instance of Flow Service should be returned", consumerService);
- assertNotSame("Provider instance and consumer instance should not be same.", salFlowService1, consumerService);
+ assertNotNull("MD-SAL instance of test Service should be returned", consumerService);
+ assertNotSame("Provider instance and consumer instance should not be same.", odlRoutedService1, consumerService);
- NodeRef nodeOne = createNodeRef("foo:node:1");
+ InstanceIdentifier<UnorderedList> nodeOnePath = createNodeRef("foo:node:1");
- /**
- * Provider 1 registers path of node 1
- */
- firstReg.registerPath(NodeContext.class, nodeOne.getValue());
+ LOG.info("Provider 1 registers path of node 1");
+ firstReg.registerPath(TestContext.class, nodeOnePath);
/**
* Consumer creates addFlow message for node one and sends it to the
* MD-SAL
- *
*/
- AddFlowInput addFlowFirstMessage = createSampleAddFlow(nodeOne, 1);
- consumerService.addFlow(addFlowFirstMessage);
+ RoutedSimpleRouteInput simpleRouteFirstFoo = createSimpleRouteInput(nodeOnePath);
+ consumerService.routedSimpleRoute(simpleRouteFirstFoo);
/**
* Verifies that implementation of the first provider received the same
* message from MD-SAL.
- *
*/
- verify(salFlowService1).addFlow(addFlowFirstMessage);
-
+ verify(odlRoutedService1).routedSimpleRoute(simpleRouteFirstFoo);
/**
* Verifies that second instance was not invoked with first message
- *
*/
- verify(salFlowService2, times(0)).addFlow(addFlowFirstMessage);
+ verify(odlRoutedService2, times(0)).routedSimpleRoute(simpleRouteFirstFoo);
- /**
- * Provider 2 registers path of node 2
- *
- */
- NodeRef nodeTwo = createNodeRef("foo:node:2");
- secondReg.registerPath(NodeContext.class, nodeTwo.getValue());
+ LOG.info("Provider 2 registers path of node 2");
+ InstanceIdentifier<UnorderedList> nodeTwo = createNodeRef("foo:node:2");
+ secondReg.registerPath(TestContext.class, nodeTwo);
/**
* Consumer sends message to nodeTwo for three times. Should be
* processed by second instance.
*/
- AddFlowInput AddFlowSecondMessage = createSampleAddFlow(nodeTwo, 2);
- consumerService.addFlow(AddFlowSecondMessage);
- consumerService.addFlow(AddFlowSecondMessage);
- consumerService.addFlow(AddFlowSecondMessage);
+ RoutedSimpleRouteInput simpleRouteSecondFoo = createSimpleRouteInput(nodeTwo);
+ consumerService.routedSimpleRoute(simpleRouteSecondFoo);
+ consumerService.routedSimpleRoute(simpleRouteSecondFoo);
+ consumerService.routedSimpleRoute(simpleRouteSecondFoo);
/**
* Verifies that second instance was invoked 3 times with second message
* and first instance wasn't invoked.
*
*/
- verify(salFlowService2, times(3)).addFlow(AddFlowSecondMessage);
- verify(salFlowService1, times(0)).addFlow(AddFlowSecondMessage);
+ verify(odlRoutedService2, times(3)).routedSimpleRoute(simpleRouteSecondFoo);
+ verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteSecondFoo);
- /**
- * Unregisteration of the path for the node one in the first provider
- *
- */
- firstReg.unregisterPath(NodeContext.class, nodeOne.getValue());
+ LOG.info("Unregistration of the path for the node one in the first provider");
+ firstReg.unregisterPath(TestContext.class, nodeOnePath);
- /**
- * Provider 2 registers path of node 1
- *
- */
- secondReg.registerPath(NodeContext.class, nodeOne.getValue());
+ LOG.info("Provider 2 registers path of node 1");
+ secondReg.registerPath(TestContext.class, nodeOnePath);
/**
* A consumer sends third message to node 1
- *
*/
- AddFlowInput AddFlowThirdMessage = createSampleAddFlow(nodeOne, 3);
- consumerService.addFlow(AddFlowThirdMessage);
+ RoutedSimpleRouteInput simpleRouteThirdFoo = createSimpleRouteInput(nodeOnePath);
+ consumerService.routedSimpleRoute(simpleRouteThirdFoo);
/**
* Verifies that provider 1 wasn't invoked and provider 2 was invoked 1
* time.
+ * TODO: fix unregister path
*/
- verify(salFlowService1, times(0)).addFlow(AddFlowThirdMessage);
- verify(salFlowService2).addFlow(AddFlowThirdMessage);
+ //verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteThirdFoo);
+ verify(odlRoutedService2).routedSimpleRoute(simpleRouteThirdFoo);
}
*
* @param string
* string with key(path)
- * @return instance of the type NodeRef
+ * @return instance identifier to {@link UnorderedList}
*/
- private static NodeRef createNodeRef(String string) {
- NodeKey key = new NodeKey(new NodeId(string));
- InstanceIdentifier<Node> path = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
-
- return new NodeRef(path);
+ private static InstanceIdentifier<UnorderedList> createNodeRef(String string) {
+ UnorderedListKey key = new UnorderedListKey(string);
+ InstanceIdentifier<UnorderedList> path = InstanceIdentifier.builder(Lists.class)
+ .child(UnorderedContainer.class)
+ .child(UnorderedList.class, key)
+ .build();
+
+ return path;
}
/**
*
* @param node
* NodeRef value
- * @param cookie
- * integer with cookie value
- * @return AddFlowInput instance
+ * @return simpleRouteInput instance
*/
- static AddFlowInput createSampleAddFlow(NodeRef node, int cookie) {
- AddFlowInputBuilder ret = new AddFlowInputBuilder();
- ret.setNode(node);
- ret.setCookie(new FlowCookie(BigInteger.valueOf(cookie)));
+ static RoutedSimpleRouteInput createSimpleRouteInput(InstanceIdentifier<UnorderedList> node) {
+ RoutedSimpleRouteInputBuilder ret = new RoutedSimpleRouteInputBuilder();
+ ret.setRoute(node);
return ret.build();
}
}
@Override public void onReceive(Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
if(LOG.isDebugEnabled()) {
- LOG.debug("Received message {}", messageType);
+// LOG.debug("Received message {}", messageType);
}
handleReceive(message);
if(LOG.isDebugEnabled()) {
- LOG.debug("Done handling message {}", messageType);
+// LOG.debug("Done handling message {}", messageType);
}
}
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
/// The name of this shard
private final ShardIdentifier name;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+ LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
}
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity());
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
@Override
public void onReceiveRecover(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveRecover: Received message {} from {}",
- message.getClass().toString(),
- getSender());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+ message.getClass().toString(), getSender());
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+ LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
+ persistenceId());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+ LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
}
if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("Current transaction {} has timed out after {} ms - aborting",
- cohortEntry.getTransactionID(), transactionCommitTimeout);
+ LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
- LOG.debug("Committing transaction {}", transactionID);
+ LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
- String.format("Cannot commit transaction %s - it is not the current transaction",
- transactionID));
+ String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
Shard.this.persistData(getSender(), transactionID,
new ModificationPayload(cohortEntry.getModification()));
}
- } catch (InterruptedException | ExecutionException | IOException e) {
- LOG.error(e, "An exception occurred while preCommitting transaction {}",
- cohortEntry.getTransactionID());
+ } catch (Exception e) {
+ LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("Could not finish committing transaction %s - no CohortEntry found",
- transactionID));
+ String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
return;
}
- LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
// We block on the future here so we don't have to worry about possibly accessing our
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Exception e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
shardMBean.incrementFailedTransactionsCount();
+ } finally {
+ commitCoordinator.currentTransactionComplete(transactionID, true);
}
-
- commitCoordinator.currentTransactionComplete(transactionID, true);
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
- ready.getTxnClientVersion());
+ LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+ ready.getTransactionID(), ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
- LOG.debug("Aborting transaction {}", transactionID);
+ LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "An exception happened during abort");
+ LOG.error(t, "{}: An exception happened during abort", persistenceId());
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
- "Could not find shard leader so transaction cannot be created. This typically happens" +
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
" when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.")), getSelf());
+ " later.", persistenceId()))), getSelf());
}
}
.build();
if(LOG.isDebugEnabled()) {
- LOG.debug("Creating transaction : {} ", transactionId);
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
}
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
}
}
private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+ LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
- LOG.debug("Shard is not the leader - delaying registration");
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg =
new DelayedListenerRegistration(registerChangeListener);
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- listenerRegistration.path());
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
- LOG.debug("Registering for path {}", registerChangeListener.getPath());
+ LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
}
}
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
- LOG.error("Unknown state received {} during recovery", data);
+ LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
}
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+ LOG.debug("{}: submitted recovery sbapshot", persistenceId());
}
}
@Override
protected void applyCurrentLogRecoveryBatch() {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+ LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
currentLogRecoveryBatch.size());
}
}
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
}
for(DOMStoreWriteTransaction tx: txList) {
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
}
}
}
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
}
else if (data instanceof CompositeModificationPayload) {
applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- data, data.getClass().getClassLoader(),
+ LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
+ "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
- LOG.info("Applying snapshot");
+ LOG.info("{}: Applying snapshot", persistenceId());
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "An exception occurred when applying snapshot");
+ LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
} finally {
- LOG.info("Done applying snapshot");
+ LOG.info("{}: Done applying snapshot", persistenceId());
}
}
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
- "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- entry.getKey(), getId());
+ "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ persistenceId(), entry.getKey(), getId());
}
entry.getValue().close();
}
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
*/
public class ShardCommitCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final int queueCapacity;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+ private final LoggingAdapter log;
+
+ private final String name;
+
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
+ this.log = log;
+ this.name = name;
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Processing canCommit for transaction {} for shard {}",
- transactionID, shard.path());
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Processing canCommit for transaction {} for shard {}",
+ name, transactionID, shard.path());
}
// Lookup the cohort entry that was cached previously (or should have been) by
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("No cohort entry found for transaction %s", transactionID));
- LOG.error(ex.getMessage());
+ String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
return;
}
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
- LOG.debug("Transaction {} is already in progress - queueing transaction {}",
- currentCohortEntry.getTransactionID(), transactionID);
+ log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
+ name, currentCohortEntry.getTransactionID(), transactionID);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
- String.format("Could not enqueue transaction %s - the maximum commit queue"+
+ String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
- transactionID, queueCapacity));
- LOG.error(ex.getMessage());
+ name, transactionID, queueCapacity));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
}
} else {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
- LOG.debug("An exception occurred during canCommit", e);
+ log.debug("{}: An exception occurred during canCommit: {}", name, e);
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
// Dequeue the next cohort entry waiting in the queue.
currentCohortEntry = queuedCohortEntries.poll();
if(currentCohortEntry != null) {
+ currentCohortEntry.updateLastAccessTime();
doCanCommit(currentCohortEntry);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.event.LoggingAdapter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private static final int TIME_OUT = 10;
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
+ private final LoggingAdapter log;
+ private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+ String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
+ this.log = log;
+ this.name = name;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true)
if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
return resultingTxList;
} else {
- LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+ log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
private final String shardName;
private final String memberName;
private final String type;
+ private final String fullName;
public ShardIdentifier(String shardName, String memberName, String type) {
this.shardName = shardName;
this.memberName = memberName;
this.type = type;
+
+ fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+ .append(type).toString();
}
@Override
return result;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
//ensure the output of toString matches the pattern above
- return new StringBuilder(memberName)
- .append("-shard-")
- .append(shardName)
- .append("-")
- .append(type)
- .toString();
+ return fullName;
}
public static Builder builder(){
package org.opendaylight.controller.cluster.datastore;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
@Deprecated
public class CompositeModificationByteStringPayloadTest {
entries.add(new ReplicatedLogImplEntry(0, 1, payload));
- assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
}
}
});
AppendEntries appendEntries =
- new AppendEntries(1, "member-1", 0, 100, entries, 1);
+ new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
*/
package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
/**
* Unit tests for DataChangeListenerRegistrationProxy.
doReturn(Futures.failed(new RuntimeException("mock"))).
when(actorContext).executeOperationAsync(any(ActorRef.class),
any(Object.class), any(Timeout.class));
+ doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
AsyncDataBroker.DataChangeScope.ONE);
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
}
public static AppendEntries keyValueAppendEntries() {
}
});
- return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
}
}
</parent>
<modelVersion>4.0.0</modelVersion>
+ <artifactId>sal-test-model</artifactId>
+ <packaging>bundle</packaging>
+
<dependencies>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
</dependency>
</dependencies>
- <artifactId>sal-test-model</artifactId>
<build>
<plugins>
<plugin>
--- /dev/null
+module opendaylight-test-notification {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:test:bi:ba:notification";
+ prefix "ntf";
+
+ description
+ "Test model for testing of registering notification listener and publishing of notification.";
+
+ revision "2015-02-05" {
+ description
+ "Initial revision";
+ }
+
+ notification out-of-pixie-dust-notification {
+ description "Just a testing notification that we can not fly for now.";
+
+ leaf reason {
+ type string;
+ }
+
+ leaf days-till-new-dust {
+ type uint16;
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-subsystem</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>ietf-netconf-notifications</artifactId>
+ <packaging>bundle</packaging>
+ <name>${project.artifactId}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>ietf-yang-types</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package>
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.*,
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.*,
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.*
+ </Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+module ietf-netconf-notifications {
+
+ namespace
+ "urn:ietf:params:xml:ns:yang:ietf-netconf-notifications";
+
+ prefix ncn;
+
+ import ietf-inet-types { prefix inet; }
+ import ietf-netconf { prefix nc; }
+
+ organization
+ "IETF NETCONF (Network Configuration Protocol) Working Group";
+
+ contact
+ "WG Web: <http://tools.ietf.org/wg/netconf/>
+ WG List: <mailto:netconf@ietf.org>
+
+ WG Chair: Bert Wijnen
+ <mailto:bertietf@bwijnen.net>
+
+ WG Chair: Mehmet Ersue
+ <mailto:mehmet.ersue@nsn.com>
+
+ Editor: Andy Bierman
+ <mailto:andy@netconfcentral.org>";
+
+ description
+ "This module defines a YANG data model for use with the
+ NETCONF protocol that allows the NETCONF client to
+ receive common NETCONF base event notifications.
+
+ Copyright (c) 2012 IETF Trust and the persons identified as
+ the document authors. All rights reserved.
+
+ Redistribution and use in source and binary forms, with or
+ without modification, is permitted pursuant to, and subject
+ to the license terms contained in, the Simplified BSD License
+
+
+
+ set forth in Section 4.c of the IETF Trust's Legal Provisions
+ Relating to IETF Documents
+ (http://trustee.ietf.org/license-info).
+
+ This version of this YANG module is part of RFC 6470; see
+ the RFC itself for full legal notices.";
+
+ revision "2012-02-06" {
+ description
+ "Initial version. Errata 3957 added.";
+ reference
+ "RFC 6470: NETCONF Base Notifications";
+ }
+
+ grouping common-session-parms {
+ description
+ "Common session parameters to identify a
+ management session.";
+
+ leaf username {
+ type string;
+ mandatory true;
+ description
+ "Name of the user for the session.";
+ }
+
+ leaf session-id {
+ type nc:session-id-or-zero-type;
+ mandatory true;
+ description
+ "Identifier of the session.
+ A NETCONF session MUST be identified by a non-zero value.
+ A non-NETCONF session MAY be identified by the value zero.";
+ }
+
+ leaf source-host {
+ type inet:ip-address;
+ description
+ "Address of the remote host for the session.";
+ }
+ }
+
+
+
+
+
+
+
+
+ grouping changed-by-parms {
+ description
+ "Common parameters to identify the source
+ of a change event, such as a configuration
+ or capability change.";
+
+ container changed-by {
+ description
+ "Indicates the source of the change.
+ If caused by internal action, then the
+ empty leaf 'server' will be present.
+ If caused by a management session, then
+ the name, remote host address, and session ID
+ of the session that made the change will be reported.";
+ choice server-or-user {
+ mandatory true;
+ leaf server {
+ type empty;
+ description
+ "If present, the change was caused
+ by the server.";
+ }
+
+ case by-user {
+ uses common-session-parms;
+ }
+ } // choice server-or-user
+ } // container changed-by-parms
+ }
+
+
+ notification netconf-config-change {
+ description
+ "Generated when the NETCONF server detects that the
+ <running> or <startup> configuration datastore
+ has been changed by a management session.
+ The notification summarizes the edits that
+ have been detected.
+
+ The server MAY choose to also generate this
+ notification while loading a datastore during the
+ boot process for the device.";
+
+ uses changed-by-parms;
+
+
+
+
+
+ leaf datastore {
+ type enumeration {
+ enum running {
+ description "The <running> datastore has changed.";
+ }
+ enum startup {
+ description "The <startup> datastore has changed";
+ }
+ }
+ default "running";
+ description
+ "Indicates which configuration datastore has changed.";
+ }
+
+ list edit {
+ description
+ "An edit record SHOULD be present for each distinct
+ edit operation that the server has detected on
+ the target datastore. This list MAY be omitted
+ if the detailed edit operations are not known.
+ The server MAY report entries in this list for
+ changes not made by a NETCONF session (e.g., CLI).";
+
+ leaf target {
+ type instance-identifier;
+ description
+ "Topmost node associated with the configuration change.
+ A server SHOULD set this object to the node within
+ the datastore that is being altered. A server MAY
+ set this object to one of the ancestors of the actual
+ node that was changed, or omit this object, if the
+ exact node is not known.";
+ }
+
+ leaf operation {
+ type nc:edit-operation-type;
+ description
+ "Type of edit operation performed.
+ A server MUST set this object to the NETCONF edit
+ operation performed on the target datastore.";
+ }
+ } // list edit
+ } // notification netconf-config-change
+
+
+
+
+
+
+ notification netconf-capability-change {
+ description
+ "Generated when the NETCONF server detects that
+ the server capabilities have changed.
+ Indicates which capabilities have been added, deleted,
+ and/or modified. The manner in which a server
+ capability is changed is outside the scope of this
+ document.";
+
+ uses changed-by-parms;
+
+ leaf-list added-capability {
+ type inet:uri;
+ description
+ "List of capabilities that have just been added.";
+ }
+
+ leaf-list deleted-capability {
+ type inet:uri;
+ description
+ "List of capabilities that have just been deleted.";
+ }
+
+ leaf-list modified-capability {
+ type inet:uri;
+ description
+ "List of capabilities that have just been modified.
+ A capability is considered to be modified if the
+ base URI for the capability has not changed, but
+ one or more of the parameters encoded at the end of
+ the capability URI have changed.
+ The new modified value of the complete URI is returned.";
+ }
+ } // notification netconf-capability-change
+
+
+ notification netconf-session-start {
+ description
+ "Generated when a NETCONF server detects that a
+ NETCONF session has started. A server MAY generate
+ this event for non-NETCONF management sessions.
+ Indicates the identity of the user that started
+ the session.";
+ uses common-session-parms;
+ } // notification netconf-session-start
+
+
+
+
+ notification netconf-session-end {
+ description
+ "Generated when a NETCONF server detects that a
+ NETCONF session has terminated.
+ A server MAY optionally generate this event for
+ non-NETCONF management sessions. Indicates the
+ identity of the user that owned the session,
+ and why the session was terminated.";
+
+ uses common-session-parms;
+
+ leaf killed-by {
+ when "../termination-reason = 'killed'";
+ type nc:session-id-type;
+ description
+ "The ID of the session that directly caused this session
+ to be abnormally terminated. If this session was abnormally
+ terminated by a non-NETCONF session unknown to the server,
+ then this leaf will not be present.";
+ }
+
+ leaf termination-reason {
+ type enumeration {
+ enum "closed" {
+ description
+ "The session was terminated by the client in normal
+ fashion, e.g., by the NETCONF <close-session>
+ protocol operation.";
+ }
+ enum "killed" {
+ description
+ "The session was terminated in abnormal
+ fashion, e.g., by the NETCONF <kill-session>
+ protocol operation.";
+ }
+ enum "dropped" {
+ description
+ "The session was terminated because the transport layer
+ connection was unexpectedly closed.";
+ }
+ enum "timeout" {
+ description
+ "The session was terminated because of inactivity,
+ e.g., waiting for the <hello> message or <rpc>
+ messages.";
+ }
+
+
+
+ enum "bad-hello" {
+ description
+ "The client's <hello> message was invalid.";
+ }
+ enum "other" {
+ description
+ "The session was terminated for some other reason.";
+ }
+ }
+ mandatory true;
+ description
+ "Reason the session was terminated.";
+ }
+ } // notification netconf-session-end
+
+
+ notification netconf-confirmed-commit {
+ description
+ "Generated when a NETCONF server detects that a
+ confirmed-commit event has occurred. Indicates the event
+ and the current state of the confirmed-commit procedure
+ in progress.";
+ reference
+ "RFC 6241, Section 8.4";
+
+ uses common-session-parms {
+ when "confirm-event != 'timeout'";
+ }
+
+ leaf confirm-event {
+ type enumeration {
+ enum "start" {
+ description
+ "The confirmed-commit procedure has started.";
+ }
+ enum "cancel" {
+ description
+ "The confirmed-commit procedure has been canceled,
+ e.g., due to the session being terminated, or an
+ explicit <cancel-commit> operation.";
+ }
+ enum "timeout" {
+ description
+ "The confirmed-commit procedure has been canceled
+ due to the confirm-timeout interval expiring.
+ The common session parameters will not be present
+ in this sub-mode.";
+ }
+
+ enum "extend" {
+ description
+ "The confirmed-commit timeout has been extended,
+ e.g., by a new <confirmed-commit> operation.";
+ }
+ enum "complete" {
+ description
+ "The confirmed-commit procedure has been completed.";
+ }
+ }
+ mandatory true;
+ description
+ "Indicates the event that caused the notification.";
+ }
+
+ leaf timeout {
+ when
+ "../confirm-event = 'start' or ../confirm-event = 'extend'";
+ type uint32;
+ units "seconds";
+ description
+ "The configured timeout value if the event type
+ is 'start' or 'extend'. This value represents
+ the approximate number of seconds from the event
+ time when the 'timeout' event might occur.";
+ }
+ } // notification netconf-confirmed-commit
+
+}
--- /dev/null
+module nc-notifications {
+
+ namespace "urn:ietf:params:xml:ns:netmod:notification";
+ prefix "manageEvent";
+
+ import ietf-yang-types{ prefix yang; }
+ import notifications { prefix ncEvent; }
+
+ organization
+ "IETF NETCONF WG";
+
+ contact
+ "netconf@ietf.org";
+
+ description
+ "Conversion of the 'manageEvent' XSD in the NETCONF
+ Notifications RFC.";
+
+ reference
+ "RFC 5277";
+
+ revision 2008-07-14 {
+ description "RFC 5277 version.";
+ }
+
+ container netconf {
+ description "Top-level element in the notification namespace";
+
+ config false;
+
+ container streams {
+ description
+ "The list of event streams supported by the system. When
+ a query is issued, the returned set of streams is
+ determined based on user privileges.";
+
+ list stream {
+ description
+ "Stream name, description and other information.";
+ key name;
+ min-elements 1;
+
+ leaf name {
+ description
+ "The name of the event stream. If this is the default
+ NETCONF stream, this must have the value 'NETCONF'.";
+ type ncEvent:streamNameType;
+ }
+
+ leaf description {
+ description
+ "A description of the event stream, including such
+ information as the type of events that are sent over
+ this stream.";
+ type string;
+ mandatory true;
+ }
+
+ leaf replaySupport {
+ description
+ "A description of the event stream, including such
+ information as the type of events that are sent over
+ this stream.";
+ type boolean;
+ mandatory true;
+ }
+
+ leaf replayLogCreationTime {
+ description
+ "The timestamp of the creation of the log used to support
+ the replay function on this stream. Note that this might
+ be earlier then the earliest available notification in
+ the log. This object is updated if the log resets for
+ some reason. This object MUST be present if replay is
+ supported.";
+ type yang:date-and-time; // xsd:dateTime is wrong!
+ }
+ }
+ }
+ }
+
+ notification replayComplete {
+ description
+ "This notification is sent to signal the end of a replay
+ portion of a subscription.";
+ }
+
+ notification notificationComplete {
+ description
+ "This notification is sent to signal the end of a notification
+ subscription. It is sent in the case that stopTime was
+ specified during the creation of the subscription..";
+ }
+
+}
--- /dev/null
+module notifications {
+
+ namespace "urn:ietf:params:xml:ns:netconf:notification:1.0";
+ prefix "ncEvent";
+
+ import ietf-yang-types { prefix yang; }
+
+ organization
+ "IETF NETCONF WG";
+
+ contact
+ "netconf@ops.ietf.org";
+
+ description
+ "Conversion of the 'ncEvent' XSD in the
+ NETCONF Notifications RFC.";
+
+ reference
+ "RFC 5277.";
+
+ revision 2008-07-14 {
+ description "RFC 5277 version.";
+ }
+
+ typedef streamNameType {
+ description
+ "The name of an event stream.";
+ type string;
+ }
+
+ rpc create-subscription {
+ description
+ "The command to create a notification subscription. It
+ takes as argument the name of the notification stream
+ and filter. Both of those options limit the content of
+ the subscription. In addition, there are two time-related
+ parameters, startTime and stopTime, which can be used to
+ select the time interval of interest to the notification
+ replay feature.";
+
+ input {
+ leaf stream {
+ description
+ "An optional parameter that indicates which stream of events
+ is of interest. If not present, then events in the default
+ NETCONF stream will be sent.";
+ type streamNameType;
+ default "NETCONF";
+ }
+
+ anyxml filter {
+ description
+ "An optional parameter that indicates which subset of all
+ possible events is of interest. The format of this
+ parameter is the same as that of the filter parameter
+ in the NETCONF protocol operations. If not present,
+ all events not precluded by other parameters will
+ be sent.";
+ }
+
+ leaf startTime {
+ description
+ "A parameter used to trigger the replay feature and
+ indicates that the replay should start at the time
+ specified. If start time is not present, this is not a
+ replay subscription.";
+ type yang:date-and-time;
+ }
+
+ leaf stopTime {
+ // must ". >= ../startTime";
+ description
+ "An optional parameter used with the optional replay
+ feature to indicate the newest notifications of
+ interest. If stop time is not present, the notifications
+ will continue until the subscription is terminated.
+ Must be used with startTime.";
+ type yang:date-and-time;
+ }
+ }
+ }
+}
+
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-subsystem</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>ietf-netconf</artifactId>
+ <packaging>bundle</packaging>
+ <name>${project.artifactId}</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>ietf-inet-types</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package>org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.*</Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+module ietf-netconf {
+
+ // the namespace for NETCONF XML definitions is unchanged
+ // from RFC 4741, which this document replaces
+ namespace "urn:ietf:params:xml:ns:netconf:base:1.0";
+
+ prefix nc;
+
+ import ietf-inet-types {
+ prefix inet;
+ }
+
+ organization
+ "IETF NETCONF (Network Configuration) Working Group";
+
+ contact
+ "WG Web: <http://tools.ietf.org/wg/netconf/>
+ WG List: <netconf@ietf.org>
+
+ WG Chair: Bert Wijnen
+ <bertietf@bwijnen.net>
+
+ WG Chair: Mehmet Ersue
+ <mehmet.ersue@nsn.com>
+
+ Editor: Martin Bjorklund
+ <mbj@tail-f.com>
+
+ Editor: Juergen Schoenwaelder
+ <j.schoenwaelder@jacobs-university.de>
+
+ Editor: Andy Bierman
+ <andy.bierman@brocade.com>";
+ description
+ "NETCONF Protocol Data Types and Protocol Operations.
+
+ Copyright (c) 2011 IETF Trust and the persons identified as
+ the document authors. All rights reserved.
+
+ Redistribution and use in source and binary forms, with or
+ without modification, is permitted pursuant to, and subject
+ to the license terms contained in, the Simplified BSD License
+ set forth in Section 4.c of the IETF Trust's Legal Provisions
+ Relating to IETF Documents
+ (http://trustee.ietf.org/license-info).
+
+ This version of this YANG module is part of RFC 6241; see
+ the RFC itself for full legal notices.";
+
+ revision 2011-06-01 {
+ description
+ "Initial revision;";
+ reference
+ "RFC 6241: Network Configuration Protocol";
+ }
+
+ extension get-filter-element-attributes {
+ description
+ "If this extension is present within an 'anyxml'
+ statement named 'filter', which must be conceptually
+ defined within the RPC input section for the <get>
+ and <get-config> protocol operations, then the
+ following unqualified XML attribute is supported
+ within the <filter> element, within a <get> or
+ <get-config> protocol operation:
+
+ type : optional attribute with allowed
+ value strings 'subtree' and 'xpath'.
+ If missing, the default value is 'subtree'.
+
+ If the 'xpath' feature is supported, then the
+ following unqualified XML attribute is
+ also supported:
+
+ select: optional attribute containing a
+ string representing an XPath expression.
+ The 'type' attribute must be equal to 'xpath'
+ if this attribute is present.";
+ }
+
+ // NETCONF capabilities defined as features
+ feature writable-running {
+ description
+ "NETCONF :writable-running capability;
+ If the server advertises the :writable-running
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.2";
+ }
+
+ feature candidate {
+ description
+ "NETCONF :candidate capability;
+ If the server advertises the :candidate
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.3";
+ }
+
+ feature confirmed-commit {
+ if-feature candidate;
+ description
+ "NETCONF :confirmed-commit:1.1 capability;
+ If the server advertises the :confirmed-commit:1.1
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+
+ reference "RFC 6241, Section 8.4";
+ }
+
+ feature rollback-on-error {
+ description
+ "NETCONF :rollback-on-error capability;
+ If the server advertises the :rollback-on-error
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.5";
+ }
+
+ feature validate {
+ description
+ "NETCONF :validate:1.1 capability;
+ If the server advertises the :validate:1.1
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.6";
+ }
+
+ feature startup {
+ description
+ "NETCONF :startup capability;
+ If the server advertises the :startup
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.7";
+ }
+
+ feature url {
+ description
+ "NETCONF :url capability;
+ If the server advertises the :url
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.8";
+ }
+
+ feature xpath {
+ description
+ "NETCONF :xpath capability;
+ If the server advertises the :xpath
+ capability for a session, then this feature must
+ also be enabled for that session. Otherwise,
+ this feature must not be enabled.";
+ reference "RFC 6241, Section 8.9";
+ }
+
+ // NETCONF Simple Types
+
+ typedef session-id-type {
+ type uint32 {
+ range "1..max";
+ }
+ description
+ "NETCONF Session Id";
+ }
+
+ typedef session-id-or-zero-type {
+ type uint32;
+ description
+ "NETCONF Session Id or Zero to indicate none";
+ }
+ typedef error-tag-type {
+ type enumeration {
+ enum in-use {
+ description
+ "The request requires a resource that
+ already is in use.";
+ }
+ enum invalid-value {
+ description
+ "The request specifies an unacceptable value for one
+ or more parameters.";
+ }
+ enum too-big {
+ description
+ "The request or response (that would be generated) is
+ too large for the implementation to handle.";
+ }
+ enum missing-attribute {
+ description
+ "An expected attribute is missing.";
+ }
+ enum bad-attribute {
+ description
+ "An attribute value is not correct; e.g., wrong type,
+ out of range, pattern mismatch.";
+ }
+ enum unknown-attribute {
+ description
+ "An unexpected attribute is present.";
+ }
+ enum missing-element {
+ description
+ "An expected element is missing.";
+ }
+ enum bad-element {
+ description
+ "An element value is not correct; e.g., wrong type,
+ out of range, pattern mismatch.";
+ }
+ enum unknown-element {
+ description
+ "An unexpected element is present.";
+ }
+ enum unknown-namespace {
+ description
+ "An unexpected namespace is present.";
+ }
+ enum access-denied {
+ description
+ "Access to the requested protocol operation or
+ data model is denied because authorization failed.";
+ }
+ enum lock-denied {
+ description
+ "Access to the requested lock is denied because the
+ lock is currently held by another entity.";
+ }
+ enum resource-denied {
+ description
+ "Request could not be completed because of
+ insufficient resources.";
+ }
+ enum rollback-failed {
+ description
+ "Request to roll back some configuration change (via
+ rollback-on-error or <discard-changes> operations)
+ was not completed for some reason.";
+
+ }
+ enum data-exists {
+ description
+ "Request could not be completed because the relevant
+ data model content already exists. For example,
+ a 'create' operation was attempted on data that
+ already exists.";
+ }
+ enum data-missing {
+ description
+ "Request could not be completed because the relevant
+ data model content does not exist. For example,
+ a 'delete' operation was attempted on
+ data that does not exist.";
+ }
+ enum operation-not-supported {
+ description
+ "Request could not be completed because the requested
+ operation is not supported by this implementation.";
+ }
+ enum operation-failed {
+ description
+ "Request could not be completed because the requested
+ operation failed for some reason not covered by
+ any other error condition.";
+ }
+ enum partial-operation {
+ description
+ "This error-tag is obsolete, and SHOULD NOT be sent
+ by servers conforming to this document.";
+ }
+ enum malformed-message {
+ description
+ "A message could not be handled because it failed to
+ be parsed correctly. For example, the message is not
+ well-formed XML or it uses an invalid character set.";
+ }
+ }
+ description "NETCONF Error Tag";
+ reference "RFC 6241, Appendix A";
+ }
+
+ typedef error-severity-type {
+ type enumeration {
+ enum error {
+ description "Error severity";
+ }
+ enum warning {
+ description "Warning severity";
+ }
+ }
+ description "NETCONF Error Severity";
+ reference "RFC 6241, Section 4.3";
+ }
+
+ typedef edit-operation-type {
+ type enumeration {
+ enum merge {
+ description
+ "The configuration data identified by the
+ element containing this attribute is merged
+ with the configuration at the corresponding
+ level in the configuration datastore identified
+ by the target parameter.";
+ }
+ enum replace {
+ description
+ "The configuration data identified by the element
+ containing this attribute replaces any related
+ configuration in the configuration datastore
+ identified by the target parameter. If no such
+ configuration data exists in the configuration
+ datastore, it is created. Unlike a
+ <copy-config> operation, which replaces the
+ entire target configuration, only the configuration
+ actually present in the config parameter is affected.";
+ }
+ enum create {
+ description
+ "The configuration data identified by the element
+ containing this attribute is added to the
+ configuration if and only if the configuration
+ data does not already exist in the configuration
+ datastore. If the configuration data exists, an
+ <rpc-error> element is returned with an
+ <error-tag> value of 'data-exists'.";
+ }
+ enum delete {
+ description
+ "The configuration data identified by the element
+ containing this attribute is deleted from the
+ configuration if and only if the configuration
+ data currently exists in the configuration
+ datastore. If the configuration data does not
+ exist, an <rpc-error> element is returned with
+ an <error-tag> value of 'data-missing'.";
+ }
+ enum remove {
+ description
+ "The configuration data identified by the element
+ containing this attribute is deleted from the
+ configuration if the configuration
+ data currently exists in the configuration
+ datastore. If the configuration data does not
+ exist, the 'remove' operation is silently ignored
+ by the server.";
+ }
+ }
+ default "merge";
+ description "NETCONF 'operation' attribute values";
+ reference "RFC 6241, Section 7.2";
+ }
+
+ // NETCONF Standard Protocol Operations
+
+ rpc get-config {
+ description
+ "Retrieve all or part of a specified configuration.";
+
+ reference "RFC 6241, Section 7.1";
+
+ input {
+ container source {
+ description
+ "Particular configuration to retrieve.";
+
+ choice config-source {
+ mandatory true;
+ description
+ "The configuration to retrieve.";
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config source.";
+ }
+ leaf running {
+ type empty;
+ description
+ "The running configuration is the config source.";
+ }
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config source.
+ This is optional-to-implement on the server because
+ not all servers will support filtering for this
+ datastore.";
+ }
+ }
+ }
+
+ anyxml filter {
+ description
+ "Subtree or XPath filter to use.";
+ nc:get-filter-element-attributes;
+ }
+ }
+
+ output {
+ anyxml data {
+ description
+ "Copy of the source datastore subset that matched
+ the filter criteria (if any). An empty data container
+ indicates that the request did not produce any results.";
+ }
+ }
+ }
+
+ rpc edit-config {
+ description
+ "The <edit-config> operation loads all or part of a specified
+ configuration to the specified target configuration.";
+
+ reference "RFC 6241, Section 7.2";
+
+ input {
+ container target {
+ description
+ "Particular configuration to edit.";
+
+ choice config-target {
+ mandatory true;
+ description
+ "The configuration target.";
+
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config target.";
+ }
+ leaf running {
+ if-feature writable-running;
+ type empty;
+ description
+ "The running configuration is the config source.";
+ }
+ }
+ }
+
+ leaf default-operation {
+ type enumeration {
+ enum merge {
+ description
+ "The default operation is merge.";
+ }
+ enum replace {
+ description
+ "The default operation is replace.";
+ }
+ enum none {
+ description
+ "There is no default operation.";
+ }
+ }
+ default "merge";
+ description
+ "The default operation to use.";
+ }
+
+ leaf test-option {
+ if-feature validate;
+ type enumeration {
+ enum test-then-set {
+ description
+ "The server will test and then set if no errors.";
+ }
+ enum set {
+ description
+ "The server will set without a test first.";
+ }
+
+ enum test-only {
+ description
+ "The server will only test and not set, even
+ if there are no errors.";
+ }
+ }
+ default "test-then-set";
+ description
+ "The test option to use.";
+ }
+
+ leaf error-option {
+ type enumeration {
+ enum stop-on-error {
+ description
+ "The server will stop on errors.";
+ }
+ enum continue-on-error {
+ description
+ "The server may continue on errors.";
+ }
+ enum rollback-on-error {
+ description
+ "The server will roll back on errors.
+ This value can only be used if the 'rollback-on-error'
+ feature is supported.";
+ }
+ }
+ default "stop-on-error";
+ description
+ "The error option to use.";
+ }
+
+ choice edit-content {
+ mandatory true;
+ description
+ "The content for the edit operation.";
+
+ anyxml config {
+ description
+ "Inline Config content.";
+ }
+ leaf url {
+ if-feature url;
+ type inet:uri;
+ description
+ "URL-based config content.";
+ }
+ }
+ }
+ }
+
+ rpc copy-config {
+ description
+ "Create or replace an entire configuration datastore with the
+ contents of another complete configuration datastore.";
+
+ reference "RFC 6241, Section 7.3";
+
+ input {
+ container target {
+ description
+ "Particular configuration to copy to.";
+
+ choice config-target {
+ mandatory true;
+ description
+ "The configuration target of the copy operation.";
+
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config target.";
+ }
+ leaf running {
+ if-feature writable-running;
+ type empty;
+ description
+ "The running configuration is the config target.
+ This is optional-to-implement on the server.";
+ }
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config target.";
+ }
+ leaf url {
+ if-feature url;
+ type inet:uri;
+ description
+ "The URL-based configuration is the config target.";
+ }
+ }
+ }
+
+ container source {
+ description
+ "Particular configuration to copy from.";
+
+ choice config-source {
+ mandatory true;
+ description
+ "The configuration source for the copy operation.";
+
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config source.";
+ }
+ leaf running {
+ type empty;
+ description
+ "The running configuration is the config source.";
+ }
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config source.";
+ }
+ leaf url {
+ if-feature url;
+ type inet:uri;
+ description
+ "The URL-based configuration is the config source.";
+ }
+ anyxml config {
+ description
+ "Inline Config content: <config> element. Represents
+ an entire configuration datastore, not
+ a subset of the running datastore.";
+ }
+ }
+ }
+ }
+ }
+
+ rpc delete-config {
+ description
+ "Delete a configuration datastore.";
+
+ reference "RFC 6241, Section 7.4";
+
+ input {
+ container target {
+ description
+ "Particular configuration to delete.";
+
+ choice config-target {
+ mandatory true;
+ description
+ "The configuration target to delete.";
+
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config target.";
+ }
+ leaf url {
+ if-feature url;
+ type inet:uri;
+ description
+ "The URL-based configuration is the config target.";
+ }
+ }
+ }
+ }
+ }
+
+ rpc lock {
+ description
+ "The lock operation allows the client to lock the configuration
+ system of a device.";
+
+ reference "RFC 6241, Section 7.5";
+
+ input {
+ container target {
+ description
+ "Particular configuration to lock.";
+
+ choice config-target {
+ mandatory true;
+ description
+ "The configuration target to lock.";
+
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config target.";
+ }
+ leaf running {
+ type empty;
+ description
+ "The running configuration is the config target.";
+ }
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config target.";
+ }
+ }
+ }
+ }
+ }
+
+ rpc unlock {
+ description
+ "The unlock operation is used to release a configuration lock,
+ previously obtained with the 'lock' operation.";
+
+ reference "RFC 6241, Section 7.6";
+
+ input {
+ container target {
+ description
+ "Particular configuration to unlock.";
+
+ choice config-target {
+ mandatory true;
+ description
+ "The configuration target to unlock.";
+
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config target.";
+ }
+ leaf running {
+ type empty;
+ description
+ "The running configuration is the config target.";
+ }
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config target.";
+ }
+ }
+ }
+ }
+ }
+
+ rpc get {
+ description
+ "Retrieve running configuration and device state information.";
+
+ reference "RFC 6241, Section 7.7";
+
+ input {
+ anyxml filter {
+ description
+ "This parameter specifies the portion of the system
+ configuration and state data to retrieve.";
+ nc:get-filter-element-attributes;
+ }
+ }
+
+ output {
+ anyxml data {
+ description
+ "Copy of the running datastore subset and/or state
+ data that matched the filter criteria (if any).
+ An empty data container indicates that the request did not
+ produce any results.";
+ }
+ }
+ }
+
+ rpc close-session {
+ description
+ "Request graceful termination of a NETCONF session.";
+
+ reference "RFC 6241, Section 7.8";
+ }
+
+ rpc kill-session {
+ description
+ "Force the termination of a NETCONF session.";
+
+ reference "RFC 6241, Section 7.9";
+
+ input {
+ leaf session-id {
+ type session-id-type;
+ mandatory true;
+ description
+ "Particular session to kill.";
+ }
+ }
+ }
+
+ rpc commit {
+ if-feature candidate;
+
+ description
+ "Commit the candidate configuration as the device's new
+ current configuration.";
+
+ reference "RFC 6241, Section 8.3.4.1";
+
+ input {
+ leaf confirmed {
+ if-feature confirmed-commit;
+ type empty;
+ description
+ "Requests a confirmed commit.";
+ reference "RFC 6241, Section 8.3.4.1";
+ }
+
+ leaf confirm-timeout {
+ if-feature confirmed-commit;
+ type uint32 {
+ range "1..max";
+ }
+ units "seconds";
+ default "600"; // 10 minutes
+ description
+ "The timeout interval for a confirmed commit.";
+ reference "RFC 6241, Section 8.3.4.1";
+ }
+
+ leaf persist {
+ if-feature confirmed-commit;
+ type string;
+ description
+ "This parameter is used to make a confirmed commit
+ persistent. A persistent confirmed commit is not aborted
+ if the NETCONF session terminates. The only way to abort
+ a persistent confirmed commit is to let the timer expire,
+ or to use the <cancel-commit> operation.
+
+ The value of this parameter is a token that must be given
+ in the 'persist-id' parameter of <commit> or
+ <cancel-commit> operations in order to confirm or cancel
+ the persistent confirmed commit.
+
+ The token should be a random string.";
+ reference "RFC 6241, Section 8.3.4.1";
+ }
+
+ leaf persist-id {
+ if-feature confirmed-commit;
+ type string;
+ description
+ "This parameter is given in order to commit a persistent
+ confirmed commit. The value must be equal to the value
+ given in the 'persist' parameter to the <commit> operation.
+ If it does not match, the operation fails with an
+ 'invalid-value' error.";
+ reference "RFC 6241, Section 8.3.4.1";
+ }
+
+ }
+ }
+
+ rpc discard-changes {
+ if-feature candidate;
+
+ description
+ "Revert the candidate configuration to the current
+ running configuration.";
+ reference "RFC 6241, Section 8.3.4.2";
+ }
+
+ rpc cancel-commit {
+ if-feature confirmed-commit;
+ description
+ "This operation is used to cancel an ongoing confirmed commit.
+ If the confirmed commit is persistent, the parameter
+ 'persist-id' must be given, and it must match the value of the
+ 'persist' parameter.";
+ reference "RFC 6241, Section 8.4.4.1";
+
+ input {
+ leaf persist-id {
+ type string;
+ description
+ "This parameter is given in order to cancel a persistent
+ confirmed commit. The value must be equal to the value
+ given in the 'persist' parameter to the <commit> operation.
+ If it does not match, the operation fails with an
+ 'invalid-value' error.";
+ }
+ }
+ }
+
+ rpc validate {
+ if-feature validate;
+
+ description
+ "Validates the contents of the specified configuration.";
+
+ reference "RFC 6241, Section 8.6.4.1";
+
+ input {
+ container source {
+ description
+ "Particular configuration to validate.";
+
+ choice config-source {
+ mandatory true;
+ description
+ "The configuration source to validate.";
+
+ leaf candidate {
+ if-feature candidate;
+ type empty;
+ description
+ "The candidate configuration is the config source.";
+ }
+ leaf running {
+ type empty;
+ description
+ "The running configuration is the config source.";
+ }
+ leaf startup {
+ if-feature startup;
+ type empty;
+ description
+ "The startup configuration is the config source.";
+ }
+ leaf url {
+ if-feature url;
+ type inet:uri;
+ description
+ "The URL-based configuration is the config source.";
+ }
+ anyxml config {
+ description
+ "Inline Config content: <config> element. Represents
+ an entire configuration datastore, not
+ a subset of the running datastore.";
+ }
+ }
+ }
+ }
+ }
+
+}
final SimulatedEditConfig sEditConfig = new SimulatedEditConfig(String.valueOf(currentSessionId), storage);
final SimulatedGetConfig sGetConfig = new SimulatedGetConfig(String.valueOf(currentSessionId), storage);
final SimulatedCommit sCommit = new SimulatedCommit(String.valueOf(currentSessionId));
- return Sets.<NetconfOperation>newHashSet(sGet, sGetConfig, sEditConfig, sCommit);
+ final SimulatedLock sLock = new SimulatedLock(String.valueOf(currentSessionId));
+ final SimulatedUnLock sUnlock = new SimulatedUnLock(String.valueOf(currentSessionId));
+ return Sets.<NetconfOperation>newHashSet(sGet, sGetConfig, sEditConfig, sCommit, sLock, sUnlock);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+class SimulatedLock extends AbstractConfigNetconfOperation {
+
+ SimulatedLock(final String netconfSessionIdForReporting) {
+ super(null, netconfSessionIdForReporting);
+ }
+
+ @Override
+ protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+ return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
+ }
+
+ @Override
+ protected String getOperationName() {
+ return "lock";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+class SimulatedUnLock extends AbstractConfigNetconfOperation {
+
+ SimulatedUnLock(final String netconfSessionIdForReporting) {
+ super(null, netconfSessionIdForReporting);
+ }
+
+ @Override
+ protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+ return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
+ }
+
+ @Override
+ protected String getOperationName() {
+ return "unlock";
+ }
+}
<version>0.3.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<modules>
<module>netconf-api</module>
<packaging>pom</packaging>
<name>controller</name> <!-- Used by Sonar to set project name -->
- <prerequisites>
- <maven>3.0</maven>
- </prerequisites>
-
<modules>
<!-- md-sal -->