<module>${artifactId}-features</module>
<module>${artifactId}-artifacts</module>
</modules>
+ <!-- DO NOT install or deploy the repo root pom as it's only needed to initiate a build -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
private void onRecoveredSnapshot(SnapshotOffer offer) {
if(LOG.isDebugEnabled()) {
- LOG.debug("SnapshotOffer called..");
+ LOG.debug("{}: SnapshotOffer called..", persistenceId());
}
initRecoveryTimer();
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
}
replicatedLog.append(logEntry);
private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getLastApplied() + 1, ale.getToIndex());
+ LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
}
for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
ApplyState applyState = (ApplyState) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState.getReplicatedLogEntry().getData());
}
} else if (message instanceof ApplyLogEntries){
ApplyLogEntries ale = (ApplyLogEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
}
persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
@Override
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
if(LOG.isDebugEnabled()) {
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
snapshot.getLastAppliedTerm()
);
}
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("SaveSnapshotSuccess received for snapshot");
+ LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
long sequenceNumber = success.metadata().sequenceNr();
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
- LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+ LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId());
context.getReplicatedLog().snapshotRollback();
- LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+ LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
} else if (message instanceof CaptureSnapshot) {
- LOG.info("CaptureSnapshot received by actor");
+ LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
if(captureSnapshot == null) {
captureSnapshot = (CaptureSnapshot)message;
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message: {}", message.getClass());
+ LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
}
}
context.getTermInformation().getCurrentTerm(), data);
if(LOG.isDebugEnabled()) {
- LOG.debug("Persist data {}", replicatedLogEntry);
+ LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
}
final RaftActorContext raftContext = getRaftActorContext();
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
} else {
- LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+ LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+ persistenceId(), getId());
}
} else if (clientActor != null) {
// Send message for replication
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
- leaderId, peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+ persistenceId(), leaderId, peerAddress);
}
return peerAddress;
}
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+ LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
persistence().saveSnapshot(sn);
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
final Procedure<ReplicatedLogEntry> callback) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
dataSizeSinceLastSnapshot = 0;
- LOG.info("Initiating Snapshot Capture..");
+ LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ",
- context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+ LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+ persistenceId(), context.getLastApplied());
+ LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+ lastAppliedIndex);
+ LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+ lastAppliedTerm);
}
// send a CaptureSnapshot to self to make the expensive operation async.
@Override public void update(long currentTerm, String votedFor) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
}
this.currentTerm = currentTerm;
this.votedFor = votedFor;
leaderId = context.getId();
- LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+ LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
if(! appendEntriesReply.isSuccess()) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
+ LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
}
}
followerToLog.get(followerId);
if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
+ LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
return this;
}
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
+ LOG.debug("{}: InstallSnapshotReply received, " +
"last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
+ context.getId(), reply.getChunkIndex(), followerId,
context.getReplicatedLog().getSnapshotIndex() + 1
);
}
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
- followerToLog.get(followerId).getNextIndex());
+ LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+ context.getId(), followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
followerToSnapshot.markSendStatus(true);
}
} else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
+ LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+ context.getId(), reply.getChunkIndex());
followerToSnapshot.markSendStatus(false);
}
} else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
+ LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+ context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
+ LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
}
// Create a tracker entry we will use this later to notify the
// then snapshot should be sent
if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+ "follower-nextIndex: %s, leader-snapshot-index: %s, " +
+ "leader-last-index: %s", context.getId(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
}
actor().tell(new InitiateInstallSnapshot(), actor());
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", e.getKey());
+ LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
// on every install snapshot, we try to capture the snapshot.
// Once a capture is going on, another one issued will get ignored by RaftActor.
private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
).toSerializable(),
actor()
);
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ context.getId(), followerActor.path(),
+ mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
+ LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
}
}
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
}
return nextChunk;
}
* snapshot chunks
*/
protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
+ private final ByteString snapshotBytes;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
private int replyReceivedForOffset;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
private int chunkIndex;
- private int totalChunks;
+ private final int totalChunks;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+ context.getId(), size, totalChunks);
}
replyReceivedForOffset = -1;
chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
+ LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
snapshotLength, start, size);
}
ByteString substring = getSnapshotBytes().substring(start, start + size);
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Cannot append entries because sender term {} is less than {}",
- appendEntries.getTerm(), currentTerm());
+ LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
+ context.getId(), appendEntries.getTerm(), currentTerm());
}
sender.tell(
RequestVote requestVote) {
if(LOG.isDebugEnabled()) {
- LOG.debug(requestVote.toString());
+ LOG.debug("{}: Received {}", context.getId(), requestVote);
}
boolean grantVote = false;
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
LOG.warning(
- "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
+ "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ context.getId(), i, i, index);
break;
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Setting last applied to {}", newLastApplied);
+ LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
}
context.setLastApplied(newLastApplied);
try {
close();
} catch (Exception e) {
- LOG.error(e, "Failed to close behavior : {}", this.state());
+ LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
}
return behavior;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import java.util.Set;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.Set;
-
/**
* The behavior of a RaftActor when it is in the CandidateState
* <p/>
peers = context.getPeerAddresses().keySet();
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Candidate has following peers: {}", peers);
+ LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
}
votesRequired = getMajorityVoteCount(peers.size());
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
return this;
RaftRPC rpc = (RaftRPC) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+ LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+ context.getTermInformation().getCurrentTerm());
}
// If RPC request or response contains term T > currentTerm:
context.getId());
if(LOG.isDebugEnabled()) {
- LOG.debug("Starting new term {}", (currentTerm + 1));
+ LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
}
// Request for a vote
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
}
}
// it's log.
if(LOG.isDebugEnabled()) {
- LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
- LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+ context.getId(), appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , prevLogTerm
+ "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , context.getId(), prevLogTerm
, appendEntries.getPrevLogTerm());
}
} else {
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
- LOG.debug("Follower ({}) is out-of-sync, " +
+ LOG.debug("{}: Follower ({}) is out-of-sync, " +
"so sending negative reply, lastIndex():{}, lastTerm():{}",
- context.getId(), lastIndex(), lastTerm()
+ context.getId(), context.getId(), lastIndex(), lastTerm()
);
}
sender.tell(
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Number of entries to be appended = {}", appendEntries.getEntries().size()
- );
+ LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+ appendEntries.getEntries().size());
}
// 3. If an existing entry conflicts with a new one (same index
}
if(LOG.isDebugEnabled()) {
- LOG.debug(
- "Removing entries from log starting at {}", matchEntry.getIndex()
- );
+ LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+ matchEntry.getIndex());
}
// Entries do not match so remove all subsequent entries
}
if(LOG.isDebugEnabled()) {
- LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
- );
+ LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+ (addEntriesFrom + lastIndex()));
}
// 4. Append any new entries not already in the log
i < appendEntries.getEntries().size(); i++) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ LOG.debug("{}: Append entry to log {}", context.getId(),
+ appendEntries.getEntries().get(i).getData());
}
context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+ LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
}
}
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to {}", context.getCommitIndex());
+ LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
}
}
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("applyLogToStateMachine, " +
+ LOG.debug("{}: applyLogToStateMachine, " +
"appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
+ "context.getLastApplied():{}, lastIndex():{}", context.getId(),
appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
);
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ LOG.debug("{}: InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
);
}
snapshotTracker = null;
} catch (Exception e){
-
- LOG.error(e, "Exception in InstallSnapshot of follower:");
+ LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
- LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
- minIsolatedLeaderPeerCount, leaderId);
+ LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
return switchBehavior(new IsolatedLeader(context));
}
}
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
- <groupId>org.opendaylight.controller.model</groupId>
- <artifactId>model-flow-service</artifactId>
- <scope>provided</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-test-model</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>opendaylight-l2-types</artifactId>
</dependency>
</dependencies>
}
+ /**
+ * @return option containing models for testing purposes
+ */
+ public static Option salTestModelBundles() {
+ return new DefaultCompositeOption( //
+ mavenBundle(CONTROLLER, "sal-test-model").versionAsInProject()
+ );
+
+ }
+
public static Option baseModelBundles() {
return new DefaultCompositeOption( //
mavenBundle(YANGTOOLS_MODELS, "yang-ext").versionAsInProject(), // //
mavenBundle(YANGTOOLS_MODELS, "ietf-inet-types").versionAsInProject(), // //
mavenBundle(YANGTOOLS_MODELS, "ietf-yang-types").versionAsInProject(), // //
- mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject(), // //
- mavenBundle(CONTROLLER_MODELS, "model-inventory").versionAsInProject());
+ mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject() // //
+ );
}
public static Option junitAndMockitoBundles() {
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.flowCapableModelBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.salTestModelBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.junitAndMockitoBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
configMinumumBundles(),
// BASE Models
baseModelBundles(),
- flowCapableModelBundles(),
+ salTestModelBundles(),
// Set fail if unresolved bundle present
systemProperty("pax.exam.osgi.unresolved.fail").value("true"),
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import com.google.inject.Inject;
import java.util.concurrent.Future;
-import org.junit.Before;
-import org.junit.Ignore;
+
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.inject.Inject;
+
+/**
+ * covers creating, reading and deleting of an item in dataStore
+ */
public class DataServiceTest extends AbstractTest {
protected DataBrokerService consumerDataService;
-
@Inject
Broker broker2;
- @Before
- public void setUp() throws Exception {
- }
-
- /*
+ /**
*
* Ignored this, because classes here are constructed from
* very different class loader as MD-SAL is run into,
* this is code is run from different classloader.
*
+ * @throws Exception
*/
@Test
- @Ignore
public void test() throws Exception {
BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
consumerDataService = session.getSALService(DataBrokerService.class);
}
};
- broker.registerConsumer(consumer1, getBundleContext());
+ broker.registerConsumer(consumer1);
assertNotNull(consumerDataService);
DataModificationTransaction transaction = consumerDataService.beginTransaction();
assertNotNull(transaction);
- InstanceIdentifier<Node> node1 = createNodeRef("0");
- DataObject node = consumerDataService.readConfigurationData(node1);
+ InstanceIdentifier<UnorderedList> node1 = createNodeRef("0");
+ DataObject node = consumerDataService.readConfigurationData(node1);
assertNull(node);
- Node nodeData1 = createNode("0");
+ UnorderedList nodeData1 = createNode("0");
transaction.putConfigurationData(node1, nodeData1);
Future<RpcResult<TransactionStatus>> commitResult = transaction.commit();
assertNotNull(result.getResult());
assertEquals(TransactionStatus.COMMITED, result.getResult());
- Node readedData = (Node) consumerDataService.readConfigurationData(node1);
+ UnorderedList readedData = (UnorderedList) consumerDataService.readConfigurationData(node1);
assertNotNull(readedData);
assertEquals(nodeData1.getKey(), readedData.getKey());
DataModificationTransaction transaction2 = consumerDataService.beginTransaction();
- assertNotNull(transaction);
+ assertNotNull(transaction2);
transaction2.removeConfigurationData(node1);
DataObject readedData2 = consumerDataService.readConfigurationData(node1);
assertNull(readedData2);
-
-
}
- private static InstanceIdentifier<Node> createNodeRef(final String string) {
- NodeKey key = new NodeKey(new NodeId(string));
- return InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
+ private static InstanceIdentifier<UnorderedList> createNodeRef(final String string) {
+ UnorderedListKey key = new UnorderedListKey(string);
+ return InstanceIdentifier.builder(Lists.class).child(UnorderedContainer.class).child(UnorderedList.class, key).build();
}
- private static Node createNode(final String string) {
- NodeBuilder ret = new NodeBuilder();
- NodeId id = new NodeId(string);
- ret.setKey(new NodeKey(id));
- ret.setId(id);
+ private static UnorderedList createNode(final String string) {
+ UnorderedListBuilder ret = new UnorderedListBuilder();
+ UnorderedListKey nodeKey = new UnorderedListKey(string);
+ ret.setKey(nodeKey);
+ ret.setName("name of " + string);
+ ret.setName("value of " + string);
return ret.build();
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.NotificationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OpendaylightTestNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotificationBuilder;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-@Ignore
+/**
+ * covers registering of notification listener, publishing of notification and receiving of notification.
+ */
public class NotificationTest extends AbstractTest {
- private final FlowListener listener1 = new FlowListener();
- private final FlowListener listener2 = new FlowListener();
+ private static final Logger LOG = LoggerFactory
+ .getLogger(NotificationTest.class);
- private ListenerRegistration<NotificationListener> listener1Reg;
- private ListenerRegistration<NotificationListener> listener2Reg;
+ protected final NotificationTestListener listener1 = new NotificationTestListener();
+ protected final NotificationTestListener listener2 = new NotificationTestListener();
- private NotificationProviderService notifyProviderService;
+ protected ListenerRegistration<NotificationListener> listener1Reg;
+ protected ListenerRegistration<NotificationListener> listener2Reg;
- @Before
- public void setUp() throws Exception {
- }
+ protected NotificationProviderService notifyProviderService;
+ /**
+ * test of delivering of notification
+ * @throws Exception
+ */
@Test
public void notificationTest() throws Exception {
- /**
- *
- * The registration of the Provider 1.
- *
- */
+ LOG.info("The registration of the Provider 1.");
AbstractTestProvider provider1 = new AbstractTestProvider() {
@Override
public void onSessionInitiated(ProviderContext session) {
};
// registerProvider method calls onSessionInitiated method above
- broker.registerProvider(provider1, getBundleContext());
+ broker.registerProvider(provider1);
assertNotNull(notifyProviderService);
- /**
- *
- * The registration of the Consumer 1. It retrieves Notification Service
- * from MD-SAL and registers SalFlowListener as notification listener
- *
- */
+ LOG.info("The registration of the Consumer 1. It retrieves Notification Service "
+ + "from MD-SAL and registers OpendaylightTestNotificationListener as notification listener");
BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
@Override
public void onSessionInitialized(ConsumerContext session) {
}
};
// registerConsumer method calls onSessionInitialized method above
- broker.registerConsumer(consumer1, getBundleContext());
+ broker.registerConsumer(consumer1);
assertNotNull(listener1Reg);
- /**
- * The notification of type FlowAdded with cookie ID 0 is created. The
- * delay 100ms to make sure that the notification was delivered to
- * listener.
- */
- notifyProviderService.publish(flowAdded(0));
+ LOG.info("The notification of type FlowAdded with cookie ID 0 is created. The "
+ + "delay 100ms to make sure that the notification was delivered to "
+ + "listener.");
+ notifyProviderService.publish(noDustNotification("rainy day", 42));
Thread.sleep(100);
/**
* Check that one notification was delivered and has correct cookie.
*
*/
- assertEquals(1, listener1.addedFlows.size());
- assertEquals(0, listener1.addedFlows.get(0).getCookie().getValue().intValue());
+ assertEquals(1, listener1.notificationBag.size());
+ assertEquals("rainy day", listener1.notificationBag.get(0).getReason());
+ assertEquals(42, listener1.notificationBag.get(0).getDaysTillNewDust().intValue());
- /**
- * The registration of the Consumer 2. SalFlowListener is registered
- * registered as notification listener.
- */
+ LOG.info("The registration of the Consumer 2. SalFlowListener is registered "
+ + "registered as notification listener.");
BindingAwareProvider provider = new BindingAwareProvider() {
@Override
};
// registerConsumer method calls onSessionInitialized method above
- broker.registerProvider(provider, getBundleContext());
+ broker.registerProvider(provider);
- /**
- * 3 notifications are published
- */
- notifyProviderService.publish(flowAdded(5));
- notifyProviderService.publish(flowAdded(10));
- notifyProviderService.publish(flowAdded(2));
+ LOG.info("3 notifications are published");
+ notifyProviderService.publish(noDustNotification("rainy day", 5));
+ notifyProviderService.publish(noDustNotification("rainy day", 10));
+ notifyProviderService.publish(noDustNotification("tax collector", 2));
/**
* The delay 100ms to make sure that the notifications were delivered to
* received 4 in total, second 3 in total).
*
*/
- assertEquals(4, listener1.addedFlows.size());
- assertEquals(3, listener2.addedFlows.size());
+ assertEquals(4, listener1.notificationBag.size());
+ assertEquals(3, listener2.notificationBag.size());
/**
* The second listener is closed (unregistered)
*/
listener2Reg.close();
- /**
- *
- * The notification 5 is published
- */
- notifyProviderService.publish(flowAdded(10));
+ LOG.info("The notification 5 is published");
+ notifyProviderService.publish(noDustNotification("entomologist hunt", 10));
/**
* The delay 100ms to make sure that the notification was delivered to
* second consumer because its listener was unregistered.
*
*/
- assertEquals(5, listener1.addedFlows.size());
- assertEquals(3, listener2.addedFlows.size());
+ assertEquals(5, listener1.notificationBag.size());
+ assertEquals(3, listener2.notificationBag.size());
}
/**
- * Creates instance of the type FlowAdded. Only cookie value is set. It is
+ * Creates instance of the type OutOfPixieDustNotification. It is
* used only for testing purpose.
*
- * @param i
- * cookie value
- * @return instance of the type FlowAdded
+ * @param reason
+ * @param days
+ * @return instance of the type OutOfPixieDustNotification
*/
- public static FlowAdded flowAdded(int i) {
- FlowAddedBuilder ret = new FlowAddedBuilder();
- ret.setCookie(new FlowCookie(BigInteger.valueOf(i)));
+ public static OutOfPixieDustNotification noDustNotification(String reason, int days) {
+ OutOfPixieDustNotificationBuilder ret = new OutOfPixieDustNotificationBuilder();
+ ret.setReason(reason).setDaysTillNewDust(days);
return ret.build();
}
/**
*
* Implements
- * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
- * SalFlowListener} and contains attributes which keep lists of objects of
- * the type
- * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819. NodeFlow
- * NodeFlow}. The lists are defined for flows which were added, removed or
- * updated.
+ * {@link OpendaylightTestNotificationListener} and contains attributes which keep lists of objects of
+ * the type {@link OutOfFairyDustNotification}.
*/
- private static class FlowListener implements SalFlowListener {
-
- List<FlowAdded> addedFlows = new ArrayList<>();
- List<FlowRemoved> removedFlows = new ArrayList<>();
- List<FlowUpdated> updatedFlows = new ArrayList<>();
-
- @Override
- public void onFlowAdded(FlowAdded notification) {
- addedFlows.add(notification);
- }
-
- @Override
- public void onFlowRemoved(FlowRemoved notification) {
- removedFlows.add(notification);
- };
-
- @Override
- public void onFlowUpdated(FlowUpdated notification) {
- updatedFlows.add(notification);
- }
-
- @Override
- public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
- // TODO Auto-generated method stub
-
- }
+ public static class NotificationTestListener implements OpendaylightTestNotificationListener {
- @Override
- public void onNodeErrorNotification(NodeErrorNotification notification) {
- // TODO Auto-generated method stub
-
- }
+ List<OutOfPixieDustNotification> notificationBag = new ArrayList<>();
@Override
- public void onNodeExperimenterErrorNotification(
- NodeExperimenterErrorNotification notification) {
- // TODO Auto-generated method stub
-
+ public void onOutOfPixieDustNotification(OutOfPixieDustNotification arg0) {
+ notificationBag.add(arg0);
}
}
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import java.math.BigInteger;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.OpendaylightTestRoutedRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.TestContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * covers routed rpc creation, registration, invocation, unregistration
+ */
public class RoutedServiceTest extends AbstractTest {
- private SalFlowService salFlowService1;
- private SalFlowService salFlowService2;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(RoutedServiceTest.class);
- private SalFlowService consumerService;
+ protected OpendaylightTestRoutedRpcService odlRoutedService1;
+ protected OpendaylightTestRoutedRpcService odlRoutedService2;
- private RoutedRpcRegistration<SalFlowService> firstReg;
- private RoutedRpcRegistration<SalFlowService> secondReg;
+ protected OpendaylightTestRoutedRpcService consumerService;
+ protected RoutedRpcRegistration<OpendaylightTestRoutedRpcService> firstReg;
+ protected RoutedRpcRegistration<OpendaylightTestRoutedRpcService> secondReg;
+
+ /**
+ * prepare mocks
+ */
@Before
- public void setUp() throws Exception {
- salFlowService1 = mock(SalFlowService.class, "First Flow Service");
- salFlowService2 = mock(SalFlowService.class, "Second Flow Service");
+ public void setUp() {
+ odlRoutedService1 = mock(OpendaylightTestRoutedRpcService.class, "First Flow Service");
+ odlRoutedService2 = mock(OpendaylightTestRoutedRpcService.class, "Second Flow Service");
}
@Test
assertNotNull(getBroker());
BindingAwareProvider provider1 = new AbstractTestProvider() {
-
@Override
public void onSessionInitiated(ProviderContext session) {
assertNotNull(session);
- firstReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService1);
+ firstReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService1);
}
};
- /**
- * Register provider 1 with first implementation of SalFlowService -
- * service1
- *
- */
- broker.registerProvider(provider1, getBundleContext());
+ LOG.info("Register provider 1 with first implementation of routeSimpleService - service1");
+ broker.registerProvider(provider1);
assertNotNull("Registration should not be null", firstReg);
- assertSame(salFlowService1, firstReg.getInstance());
+ assertSame(odlRoutedService1, firstReg.getInstance());
BindingAwareProvider provider2 = new AbstractTestProvider() {
-
@Override
public void onSessionInitiated(ProviderContext session) {
assertNotNull(session);
- secondReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService2);
+ secondReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService2);
}
};
- /**
- * Register provider 2 with first implementation of SalFlowService -
- * service2
- *
- */
- broker.registerProvider(provider2, getBundleContext());
+ LOG.info("Register provider 2 with second implementation of routeSimpleService - service2");
+ broker.registerProvider(provider2);
assertNotNull("Registration should not be null", firstReg);
- assertSame(salFlowService2, secondReg.getInstance());
+ assertSame(odlRoutedService2, secondReg.getInstance());
assertNotSame(secondReg, firstReg);
BindingAwareConsumer consumer = new BindingAwareConsumer() {
@Override
public void onSessionInitialized(ConsumerContext session) {
- consumerService = session.getRpcService(SalFlowService.class);
+ consumerService = session.getRpcService(OpendaylightTestRoutedRpcService.class);
}
};
- broker.registerConsumer(consumer, getBundleContext());
+ LOG.info("Register routeService consumer");
+ broker.registerConsumer(consumer);
- assertNotNull("MD-SAL instance of Flow Service should be returned", consumerService);
- assertNotSame("Provider instance and consumer instance should not be same.", salFlowService1, consumerService);
+ assertNotNull("MD-SAL instance of test Service should be returned", consumerService);
+ assertNotSame("Provider instance and consumer instance should not be same.", odlRoutedService1, consumerService);
- NodeRef nodeOne = createNodeRef("foo:node:1");
+ InstanceIdentifier<UnorderedList> nodeOnePath = createNodeRef("foo:node:1");
- /**
- * Provider 1 registers path of node 1
- */
- firstReg.registerPath(NodeContext.class, nodeOne.getValue());
+ LOG.info("Provider 1 registers path of node 1");
+ firstReg.registerPath(TestContext.class, nodeOnePath);
/**
* Consumer creates addFlow message for node one and sends it to the
* MD-SAL
- *
*/
- AddFlowInput addFlowFirstMessage = createSampleAddFlow(nodeOne, 1);
- consumerService.addFlow(addFlowFirstMessage);
+ RoutedSimpleRouteInput simpleRouteFirstFoo = createSimpleRouteInput(nodeOnePath);
+ consumerService.routedSimpleRoute(simpleRouteFirstFoo);
/**
* Verifies that implementation of the first provider received the same
* message from MD-SAL.
- *
*/
- verify(salFlowService1).addFlow(addFlowFirstMessage);
-
+ verify(odlRoutedService1).routedSimpleRoute(simpleRouteFirstFoo);
/**
* Verifies that second instance was not invoked with first message
- *
*/
- verify(salFlowService2, times(0)).addFlow(addFlowFirstMessage);
+ verify(odlRoutedService2, times(0)).routedSimpleRoute(simpleRouteFirstFoo);
- /**
- * Provider 2 registers path of node 2
- *
- */
- NodeRef nodeTwo = createNodeRef("foo:node:2");
- secondReg.registerPath(NodeContext.class, nodeTwo.getValue());
+ LOG.info("Provider 2 registers path of node 2");
+ InstanceIdentifier<UnorderedList> nodeTwo = createNodeRef("foo:node:2");
+ secondReg.registerPath(TestContext.class, nodeTwo);
/**
* Consumer sends message to nodeTwo for three times. Should be
* processed by second instance.
*/
- AddFlowInput AddFlowSecondMessage = createSampleAddFlow(nodeTwo, 2);
- consumerService.addFlow(AddFlowSecondMessage);
- consumerService.addFlow(AddFlowSecondMessage);
- consumerService.addFlow(AddFlowSecondMessage);
+ RoutedSimpleRouteInput simpleRouteSecondFoo = createSimpleRouteInput(nodeTwo);
+ consumerService.routedSimpleRoute(simpleRouteSecondFoo);
+ consumerService.routedSimpleRoute(simpleRouteSecondFoo);
+ consumerService.routedSimpleRoute(simpleRouteSecondFoo);
/**
* Verifies that second instance was invoked 3 times with second message
* and first instance wasn't invoked.
*
*/
- verify(salFlowService2, times(3)).addFlow(AddFlowSecondMessage);
- verify(salFlowService1, times(0)).addFlow(AddFlowSecondMessage);
+ verify(odlRoutedService2, times(3)).routedSimpleRoute(simpleRouteSecondFoo);
+ verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteSecondFoo);
- /**
- * Unregisteration of the path for the node one in the first provider
- *
- */
- firstReg.unregisterPath(NodeContext.class, nodeOne.getValue());
+ LOG.info("Unregistration of the path for the node one in the first provider");
+ firstReg.unregisterPath(TestContext.class, nodeOnePath);
- /**
- * Provider 2 registers path of node 1
- *
- */
- secondReg.registerPath(NodeContext.class, nodeOne.getValue());
+ LOG.info("Provider 2 registers path of node 1");
+ secondReg.registerPath(TestContext.class, nodeOnePath);
/**
* A consumer sends third message to node 1
- *
*/
- AddFlowInput AddFlowThirdMessage = createSampleAddFlow(nodeOne, 3);
- consumerService.addFlow(AddFlowThirdMessage);
+ RoutedSimpleRouteInput simpleRouteThirdFoo = createSimpleRouteInput(nodeOnePath);
+ consumerService.routedSimpleRoute(simpleRouteThirdFoo);
/**
* Verifies that provider 1 wasn't invoked and provider 2 was invoked 1
* time.
+ * TODO: fix unregister path
*/
- verify(salFlowService1, times(0)).addFlow(AddFlowThirdMessage);
- verify(salFlowService2).addFlow(AddFlowThirdMessage);
+ //verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteThirdFoo);
+ verify(odlRoutedService2).routedSimpleRoute(simpleRouteThirdFoo);
}
*
* @param string
* string with key(path)
- * @return instance of the type NodeRef
+ * @return instance identifier to {@link UnorderedList}
*/
- private static NodeRef createNodeRef(String string) {
- NodeKey key = new NodeKey(new NodeId(string));
- InstanceIdentifier<Node> path = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
-
- return new NodeRef(path);
+ private static InstanceIdentifier<UnorderedList> createNodeRef(String string) {
+ UnorderedListKey key = new UnorderedListKey(string);
+ InstanceIdentifier<UnorderedList> path = InstanceIdentifier.builder(Lists.class)
+ .child(UnorderedContainer.class)
+ .child(UnorderedList.class, key)
+ .build();
+
+ return path;
}
/**
*
* @param node
* NodeRef value
- * @param cookie
- * integer with cookie value
- * @return AddFlowInput instance
+ * @return simpleRouteInput instance
*/
- static AddFlowInput createSampleAddFlow(NodeRef node, int cookie) {
- AddFlowInputBuilder ret = new AddFlowInputBuilder();
- ret.setNode(node);
- ret.setCookie(new FlowCookie(BigInteger.valueOf(cookie)));
+ static RoutedSimpleRouteInput createSimpleRouteInput(InstanceIdentifier<UnorderedList> node) {
+ RoutedSimpleRouteInputBuilder ret = new RoutedSimpleRouteInputBuilder();
+ ret.setRoute(node);
return ret.build();
}
}
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
/// The name of this shard
private final ShardIdentifier name;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+ LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
}
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity());
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
@Override
public void onReceiveRecover(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveRecover: Received message {} from {}",
- message.getClass().toString(),
- getSender());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+ message.getClass().toString(), getSender());
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+ LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
+ persistenceId());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+ LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
}
if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("Current transaction {} has timed out after {} ms - aborting",
- cohortEntry.getTransactionID(), transactionCommitTimeout);
+ LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
- LOG.debug("Committing transaction {}", transactionID);
+ LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
- String.format("Cannot commit transaction %s - it is not the current transaction",
- transactionID));
+ String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
new ModificationPayload(cohortEntry.getModification()));
}
} catch (InterruptedException | ExecutionException | IOException e) {
- LOG.error(e, "An exception occurred while preCommitting transaction {}",
- cohortEntry.getTransactionID());
+ LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("Could not finish committing transaction %s - no CohortEntry found",
- transactionID));
+ String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
return;
}
- LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
// We block on the future here so we don't have to worry about possibly accessing our
} catch (InterruptedException | ExecutionException e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
shardMBean.incrementFailedTransactionsCount();
}
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
- ready.getTxnClientVersion());
+ LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+ ready.getTransactionID(), ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
- LOG.debug("Aborting transaction {}", transactionID);
+ LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "An exception happened during abort");
+ LOG.error(t, "{}: An exception happened during abort", persistenceId());
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
- "Could not find shard leader so transaction cannot be created. This typically happens" +
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
" when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.")), getSelf());
+ " later.", persistenceId()))), getSelf());
}
}
.build();
if(LOG.isDebugEnabled()) {
- LOG.debug("Creating transaction : {} ", transactionId);
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
}
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
}
}
private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+ LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
- LOG.debug("Shard is not the leader - delaying registration");
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg =
new DelayedListenerRegistration(registerChangeListener);
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- listenerRegistration.path());
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
- LOG.debug("Registering for path {}", registerChangeListener.getPath());
+ LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
}
}
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
- LOG.error("Unknown state received {} during recovery", data);
+ LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
}
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+ LOG.debug("{}: submitted recovery sbapshot", persistenceId());
}
}
@Override
protected void applyCurrentLogRecoveryBatch() {
if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+ LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
currentLogRecoveryBatch.size());
}
}
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
- LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
}
for(DOMStoreWriteTransaction tx: txList) {
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "Failed to commit");
+ LOG.error(e, "{}: Failed to commit", persistenceId());
}
}
}
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "Error extracting ModificationPayload");
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
}
}
else if (data instanceof CompositeModificationPayload) {
applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- data, data.getClass().getClassLoader(),
+ LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
+ "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
- LOG.info("Applying snapshot");
+ LOG.info("{}: Applying snapshot", persistenceId());
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "An exception occurred when applying snapshot");
+ LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
} finally {
- LOG.info("Done applying snapshot");
+ LOG.info("{}: Done applying snapshot", persistenceId());
}
}
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
- "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- entry.getKey(), getId());
+ "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ persistenceId(), entry.getKey(), getId());
}
entry.getValue().close();
}
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
*/
public class ShardCommitCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final int queueCapacity;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+ private final LoggingAdapter log;
+
+ private final String name;
+
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
+ this.log = log;
+ this.name = name;
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Processing canCommit for transaction {} for shard {}",
- transactionID, shard.path());
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Processing canCommit for transaction {} for shard {}",
+ name, transactionID, shard.path());
}
// Lookup the cohort entry that was cached previously (or should have been) by
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("No cohort entry found for transaction %s", transactionID));
- LOG.error(ex.getMessage());
+ String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
return;
}
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
- LOG.debug("Transaction {} is already in progress - queueing transaction {}",
- currentCohortEntry.getTransactionID(), transactionID);
+ log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
+ name, currentCohortEntry.getTransactionID(), transactionID);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
- String.format("Could not enqueue transaction %s - the maximum commit queue"+
+ String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
- transactionID, queueCapacity));
- LOG.error(ex.getMessage());
+ name, transactionID, queueCapacity));
+ log.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
}
} else {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
- LOG.debug("An exception occurred during canCommit", e);
+ log.debug("{}: An exception occurred during canCommit: {}", name, e);
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.event.LoggingAdapter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private static final int TIME_OUT = 10;
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
+ private final LoggingAdapter log;
+ private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+ String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
+ this.log = log;
+ this.name = name;
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true)
if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
return resultingTxList;
} else {
- LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+ log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
private final String shardName;
private final String memberName;
private final String type;
+ private final String fullName;
public ShardIdentifier(String shardName, String memberName, String type) {
this.shardName = shardName;
this.memberName = memberName;
this.type = type;
+
+ fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+ .append(type).toString();
}
@Override
return result;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
//ensure the output of toString matches the pattern above
- return new StringBuilder(memberName)
- .append("-shard-")
- .append(shardName)
- .append("-")
- .append(type)
- .toString();
+ return fullName;
}
public static Builder builder(){
*/
package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
/**
* Unit tests for DataChangeListenerRegistrationProxy.
doReturn(Futures.failed(new RuntimeException("mock"))).
when(actorContext).executeOperationAsync(any(ActorRef.class),
any(Object.class), any(Timeout.class));
+ doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
AsyncDataBroker.DataChangeScope.ONE);
</parent>
<modelVersion>4.0.0</modelVersion>
+ <artifactId>sal-test-model</artifactId>
+ <packaging>bundle</packaging>
+
<dependencies>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
</dependency>
</dependencies>
- <artifactId>sal-test-model</artifactId>
<build>
<plugins>
<plugin>
--- /dev/null
+module opendaylight-test-notification {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:test:bi:ba:notification";
+ prefix "ntf";
+
+ description
+ "Test model for testing of registering notification listener and publishing of notification.";
+
+ revision "2015-02-05" {
+ description
+ "Initial revision";
+ }
+
+ notification out-of-pixie-dust-notification {
+ description "Just a testing notification that we can not fly for now.";
+
+ leaf reason {
+ type string;
+ }
+
+ leaf days-till-new-dust {
+ type uint16;
+ }
+ }
+}
\ No newline at end of file