if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
builder.setFlowRef(new FlowRef(identifier));
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalFlowService().removeFlow(builder.build());
if (tableIdValidationPrecondition(tableKey, update)) {
final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setFlowRef(new FlowRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
builder.setUpdatedFlow((new UpdatedFlowBuilder(update)).build());
if (tableIdValidationPrecondition(tableKey, addDataObj)) {
final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setFlowRef(new FlowRef(identifier));
builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
final Group group = (removeDataObj);
final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setGroupRef(new GroupRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalGroupService().removeGroup(builder.build());
final Group updatedGroup = (update);
final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder();
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setGroupRef(new GroupRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
builder.setUpdatedGroup((new UpdatedGroupBuilder(updatedGroup)).build());
final Group group = (addDataObj);
final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setGroupRef(new GroupRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalGroupService().addGroup(builder.build());
final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setMeterRef(new MeterRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalMeterService().removeMeter(builder.build());
final UpdateMeterInputBuilder builder = new UpdateMeterInputBuilder();
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setMeterRef(new MeterRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
builder.setUpdatedMeter((new UpdatedMeterBuilder(update)).build());
final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setMeterRef(new MeterRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalMeterService().addMeter(builder.build());
}
} else if (message instanceof PrintState) {
- LOG.debug("State of the node:{} has entries={}, {}",
- getId(), state.size(), getReplicatedLogState());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("State of the node:{} has entries={}, {}",
+ getId(), state.size(), getReplicatedLogState());
+ }
} else if (message instanceof PrintRole) {
- LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ }
} else {
super.onReceiveCommand(message);
} catch (Exception e) {
LOG.error("Exception in applying snapshot", e);
}
- LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+ }
}
private ByteString fromObject(Object snapshot) throws Exception {
*/
public class DefaultConfigParamsImpl implements ConfigParams {
- private static final int SNAPSHOT_BATCH_COUNT = 100000;
+ private static final int SNAPSHOT_BATCH_COUNT = 20000;
/**
* The maximum election time variance
@Override public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
- LOG.debug("SnapshotOffer called..");
+ LOG.info("SnapshotOffer called..");
SnapshotOffer offer = (SnapshotOffer) message;
Snapshot snapshot = (Snapshot) offer.snapshot();
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
- LOG.debug("Applied snapshot to replicatedLog. " +
- "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ LOG.info("Applied snapshot to replicatedLog. " +
+ "snapshotIndex={}, snapshotTerm={}, journal-size={}",
replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
- replicatedLog.size());
+ replicatedLog.size()
+ );
// Apply the snapshot to the actors state
applySnapshot(ByteString.copyFrom(snapshot.getState()));
} else if (message instanceof UpdateElectionTerm) {
context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
} else if (message instanceof RecoveryCompleted) {
- LOG.debug(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "RecoveryCompleted - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size()
+ );
+ }
currentBehavior = switchBehavior(RaftState.Follower);
onStateChanged();
}
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
- applyState.getReplicatedLogEntry().getData());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Applying state for log index {} data {}",
+ applyState.getReplicatedLogEntry().getIndex(),
+ applyState.getReplicatedLogEntry().getData());
+ }
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm()
+ );
+ }
applySnapshot(ByteString.copyFrom(snapshot.getState()));
//clears the followers log, sets the snapshot index to ensure adjusted-index works
context.removePeer(rrp.getName());
} else if (message instanceof CaptureSnapshot) {
- LOG.debug("CaptureSnapshot received by actor");
+ LOG.info("CaptureSnapshot received by actor");
CaptureSnapshot cs = (CaptureSnapshot)message;
captureSnapshot = cs;
createSnapshot();
} else if (message instanceof CaptureSnapshotReply){
- LOG.debug("CaptureSnapshotReply received by actor");
+ LOG.info("CaptureSnapshotReply received by actor");
CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
ByteString stateInBytes = csr.getSnapshot();
- LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+ LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
handleCaptureSnapshotReply(stateInBytes);
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
- LOG.debug("onReceiveCommand: message:" + message.getClass());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveCommand: message:" + message.getClass());
+ }
}
RaftState state =
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- LOG.debug("Persist data {}", replicatedLogEntry);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Persist data {}", replicatedLogEntry);
+ }
replicatedLog
.appendAndPersist(clientActor, identifier, replicatedLogEntry);
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+ }
return peerAddress;
}
lastAppliedTerm = lastAppliedEntry.getTerm();
}
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ",
+ context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ }
// send a CaptureSnapshot to self to make the expensive operation async.
getSelf().tell(new CaptureSnapshot(
}
@Override public void update(long currentTerm, String votedFor) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ }
this.currentTerm = currentTerm;
this.votedFor = votedFor;
}
return null;
}
+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+
/**
* Find the log index from the previous to last entry in the log
*
i < index + 1; i++) {
ActorRef clientActor = null;
String identifier = null;
- ClientRequestTracker tracker = findClientRequestTracker(i);
+ ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
clientActor = tracker.getClientActor();
context.getReplicatedLog().get(i);
if (replicatedLogEntry != null) {
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
actor().tell(new ApplyState(clientActor, identifier,
replicatedLogEntry), actor());
newLastApplied = i;
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- context.getLogger().error(
+ context.getLogger().warning(
"Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
break;
}
}
- // Send a local message to the local RaftActor (it's derived class to be
- // specific to apply the log to it's index)
context.getLogger().debug("Setting last applied to {}", newLastApplied);
context.setLastApplied(newLastApplied);
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.event.LoggingAdapter;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
public class Follower extends AbstractRaftActorBehavior {
private ByteString snapshotChunksCollected = ByteString.EMPTY;
+ private final LoggingAdapter LOG;
+
public Follower(RaftActorContext context) {
super(context);
+ LOG = context.getLogger();
+
scheduleElection(electionDuration());
}
AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
- context.getLogger()
- .debug(appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
}
// TODO : Refactor this method into a bunch of smaller methods
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- context.getLogger().debug(
- "The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
+ appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
&& appendEntries.getPrevLogIndex() != -1
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- context.getLogger().debug(
- "The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
+ appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
&& previousEntry != null
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- context.getLogger().debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , previousEntry.getTerm()
- , appendEntries.getPrevLogTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , previousEntry.getTerm()
+ , appendEntries.getPrevLogTerm());
+ }
} else {
outOfSync = false;
}
if (outOfSync) {
// We found that the log was out of sync so just send a negative
// reply and return
- context.getLogger().debug("Follower is out-of-sync, " +
- "so sending negative reply, lastIndex():{}, lastTerm():{}",
- lastIndex(), lastTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Follower is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ lastIndex(), lastTerm()
+ );
+ }
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
- context.getLogger().debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Number of entries to be appended = " + appendEntries
+ .getEntries().size()
+ );
+ }
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
continue;
}
- context.getLogger().debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removing entries from log starting at "
+ + matchEntry.getIndex()
+ );
+ }
// Entries do not match so remove all subsequent entries
context.getReplicatedLog()
}
}
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
- );
+ if(LOG.isDebugEnabled()) {
+ context.getLogger().debug(
+ "After cleanup entries to be added from = " + (addEntriesFrom
+ + lastIndex())
+ );
+ }
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
.appendAndPersist(appendEntries.getEntries().get(i));
}
- context.getLogger().debug(
- "Log size is now " + context.getReplicatedLog().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Log size is now " + context.getReplicatedLog().size());
+ }
}
context.getReplicatedLog().lastIndex()));
if (prevCommitIndex != context.getCommitIndex()) {
- context.getLogger()
- .debug("Commit index set to " + context.getCommitIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Commit index set to " + context.getCommitIndex());
+ }
}
// If commitIndex > lastApplied: increment lastApplied, apply
// check if there are any entries to be applied. last-applied can be equal to last-index
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
- context.getLogger().debug("applyLogToStateMachine, " +
- "appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
- appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}",
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
+ );
+ }
+
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
- context.getLogger().debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
+ );
+ }
try {
if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
} else {
// we have more to go
snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
+ installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+ }
}
sender.tell(new InstallSnapshotReply(
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
private final int minReplicationCount;
+ private final LoggingAdapter LOG;
+
public Leader(RaftActorContext context) {
super(context);
+ LOG = context.getLogger();
+
if (lastIndex() >= 0) {
context.setCommitIndex(lastIndex());
}
followerToLog.put(followerId, followerLogInformation);
}
- context.getLogger().debug("Election:Leader has following peers:"+ followers);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Election:Leader has following peers:" + followers);
+ }
if (followers.size() > 0) {
minReplicationCount = (followers.size() + 1) / 2 + 1;
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- context.getLogger().debug(appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
return state();
}
AppendEntriesReply appendEntriesReply) {
if(! appendEntriesReply.isSuccess()) {
- context.getLogger()
- .debug(appendEntriesReply.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntriesReply.toString());
+ }
}
// Update the FollowerLogInformation
followerToLog.get(followerId);
if(followerLogInformation == null){
- context.getLogger().error("Unknown follower {}", followerId);
+ LOG.error("Unknown follower {}", followerId);
return state();
}
return state();
}
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+
+ ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
+ if(toRemove != null) {
+ trackerList.remove(toRemove);
+ }
+
+ return toRemove;
+ }
+
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- context.getLogger().debug("InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshotReply received, " +
+ "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+ reply.getChunkIndex(), followerId,
+ context.getReplicatedLog().getSnapshotIndex() + 1
+ );
+ }
FollowerLogInformation followerLogInformation =
followerToLog.get(followerId);
followerLogInformation.setNextIndex(
context.getReplicatedLog().getSnapshotIndex() + 1);
mapFollowerToSnapshot.remove(followerId);
- context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
+ followerToLog.get(followerId).getNextIndex().get());
+ }
} else {
followerToSnapshot.markSendStatus(true);
}
} else {
- context.getLogger().info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex());
+ LOG.info("InstallSnapshotReply received, " +
+ "sending snapshot chunk failed, Will retry, Chunk:{}",
+ reply.getChunkIndex()
+ );
followerToSnapshot.markSendStatus(false);
}
} else {
- context.getLogger().error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+ LOG.error("ERROR!!" +
+ "FollowerId in InstallSnapshotReply not known to Leader" +
+ " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+ followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+ );
}
}
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- context.getLogger().debug("Replicate message " + logIndex);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Replicate message " + logIndex);
+ }
// Create a tracker entry we will use this later to notify the
// client actor
if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
// if the follower is just not starting and leader's index
// is more than followers index
- context.getLogger().debug("SendInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("SendInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+ );
+ }
actor().tell(new SendInstallSnapshot(), actor());
} else {
).toSerializable(),
actor()
);
- context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
mapFollowerToSnapshot.get(followerId).getTotalChunks());
} catch (IOException e) {
- context.getLogger().error("InstallSnapshot failed for Leader.", e);
+ LOG.error("InstallSnapshot failed for Leader.", e);
}
}
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
ByteString nextChunk = followerToSnapshot.getNextChunk();
- context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ }
return nextChunk;
}
int size = snapshotBytes.size();
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot {} bytes, total chunks to send:{}",
+ size, totalChunks);
+ }
}
public ByteString getSnapshotBytes() {
}
}
- context.getLogger().debug("length={}, offset={},size={}",
- snapshotLength, start, size);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("length={}, offset={},size={}",
+ snapshotLength, start, size);
+ }
return getSnapshotBytes().substring(start, start + size);
}
package org.opendaylight.controller.cluster.raft.messages;
import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
public class InstallSnapshot extends AbstractRaftRPC {
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Logging.getLogger(getContext().system(), this);
public AbstractUntypedActor() {
- LOG.debug("Actor created {}", getSelf());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor created {}", getSelf());
+ }
getContext().
system().
actorSelection("user/termination-monitor").
@Override public void onReceive(Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
- LOG.debug("Received message {}", messageType);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received message {}", messageType);
+ }
handleReceive(message);
-
- LOG.debug("Done handling message {}", messageType);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Done handling message {}", messageType);
+ }
}
protected abstract void handleReceive(Object message) throws Exception;
}
protected void unknownMessage(Object message) throws Exception {
- LOG.debug("Received unhandled message {}", message);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received unhandled message {}", message);
+ }
unhandled(message);
}
}
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: InstallSnapshot.proto
-package org.opendaylight.controller.cluster.raft.protobuff.messages;
+package org.opendaylight.controller.protobuff.messages.cluster.raft;
public final class InstallSnapshotMessages {
private InstallSnapshotMessages() {}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class);
}
public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
- com.google.protobuf.ByteString bs =
+ com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
getLeaderIdBytes() {
java.lang.Object ref = leaderId_;
if (ref instanceof java.lang.String) {
- com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
leaderId_ = b;
return super.writeReplace();
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
- public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+ public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
- implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+ implements org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshotOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class);
}
- // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ // Construct using org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
- public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
}
- public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot build() {
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
- public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
}
public Builder mergeFrom(com.google.protobuf.Message other) {
- if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
- return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+ if (other instanceof org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) {
+ return mergeFrom((org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot)other);
} else {
super.mergeFrom(other);
return this;
}
}
- public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
- if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+ public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot other) {
+ if (other == org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
if (other.hasTerm()) {
setTerm(other.getTerm());
}
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
- parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+ parsedMessage = (org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
getLeaderIdBytes() {
java.lang.Object ref = leaderId_;
if (ref instanceof String) {
- com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
leaderId_ = b;
"\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
"Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
" \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
- "light.controller.cluster.raft.protobuff." +
- "messagesB\027InstallSnapshotMessagesH\001"
+ "light.controller.protobuff.messages.clus" +
+ "ter.raftB\027InstallSnapshotMessagesH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
for (Entry<URI, String> e: prefixes.getPrefixes()) {
writer.writeNamespace(e.getValue(), e.getKey().toString());
}
- LOG.debug("Instance identifier with Random prefix is now {}", str);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Instance identifier with Random prefix is now {}", str);
+ }
writer.writeCharacters(str);
}
DataSchemaNode childSchema = null;
if (schema instanceof DataNodeContainer) {
childSchema = SchemaUtils.findFirstSchema(child.getNodeType(), ((DataNodeContainer) schema).getChildNodes()).orNull();
- if (childSchema == null) {
+ if (childSchema == null && LOG.isDebugEnabled()) {
LOG.debug("Probably the data node \"{}\" does not conform to schema", child == null ? "" : child.getNodeType().getLocalName());
}
}
*/
public void writeValue(final @Nonnull XMLStreamWriter writer, final @Nonnull TypeDefinition<?> type, final Object value) throws XMLStreamException {
if (value == null) {
- LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName());
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Value of {}:{} is null, not encoding it", type.getQName().getNamespace(), type.getQName().getLocalName());
+ }
return;
}
writer.writeNamespace(prefix, qname.getNamespace().toString());
writer.writeCharacters(prefix + ':' + qname.getLocalName());
} else {
- LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Value of {}:{} is not a QName but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+ }
writer.writeCharacters(String.valueOf(value));
}
}
private static void write(final @Nonnull XMLStreamWriter writer, final @Nonnull InstanceIdentifierTypeDefinition type, final @Nonnull Object value) throws XMLStreamException {
if (value instanceof YangInstanceIdentifier) {
- LOG.debug("Writing InstanceIdentifier object {}", value);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Writing InstanceIdentifier object {}", value);
+ }
write(writer, (YangInstanceIdentifier)value);
} else {
- LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
- writer.writeCharacters(String.valueOf(value));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Value of {}:{} is not an InstanceIdentifier but {}", type.getQName().getNamespace(), type.getQName().getLocalName(), value.getClass());
+ }
+ writer.writeCharacters(String.valueOf(value));
}
}
}
* @return xml String
*/
public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
- LOG.debug("Converting input composite node to xml {}", cNode);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converting input composite node to xml {}", cNode);
+ }
if (cNode == null) {
return BLANK;
}
Set<RpcDefinition> rpcs = schemaContext.getOperations();
for(RpcDefinition rpc : rpcs) {
if(rpc.getQName().equals(cNode.getNodeType())){
- LOG.debug("Found the rpc definition from schema context matching with input composite node {}", rpc.getQName());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Found the rpc definition from schema context matching with input composite node {}", rpc.getQName());
+ }
CompositeNode inputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "input"));
domTree = XmlDocumentUtils.toDocument(inputContainer, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
-
- LOG.debug("input composite node to document conversion complete, document is {}", domTree);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("input composite node to document conversion complete, document is {}", domTree);
+ }
break;
}
}
* @return xml string
*/
public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
- LOG.debug("Converting output composite node to xml {}", cNode);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converting output composite node to xml {}", cNode);
+ }
if (cNode == null) {
return BLANK;
}
Set<RpcDefinition> rpcs = schemaContext.getOperations();
for(RpcDefinition rpc : rpcs) {
if(rpc.getQName().equals(cNode.getNodeType())){
- LOG.debug("Found the rpc definition from schema context matching with output composite node {}", rpc.getQName());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Found the rpc definition from schema context matching with output composite node {}", rpc.getQName());
+ }
CompositeNode outputContainer = cNode.getFirstCompositeByName(QName.create(cNode.getNodeType(), "output"));
domTree = XmlDocumentUtils.toDocument(outputContainer, rpc.getOutput(), XmlDocumentUtils.defaultValueCodecProvider());
-
- LOG.debug("output composite node to document conversion complete, document is {}", domTree);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("output composite node to document conversion complete, document is {}", domTree);
+ }
break;
}
}
LOG.error("Error during translation of Document to OutputStream", e);
}
- LOG.debug("Document to string conversion complete, xml string is {} ", writer.toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Document to string conversion complete, xml string is {} ", writer.toString());
+ }
return writer.toString();
}
* @return CompositeNode object based on the input, if any of the input parameter is null, a null object is returned
*/
public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){
- LOG.debug("Converting input xml to composite node {}", xml);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converting input xml to composite node {}", xml);
+ }
if (xml==null || xml.length()==0) {
return null;
}
Set<RpcDefinition> rpcs = schemaContext.getOperations();
for(RpcDefinition rpcDef : rpcs) {
if(rpcDef.getQName().equals(rpc)){
- LOG.debug("found the rpc definition from schema context matching rpc {}", rpc);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("found the rpc definition from schema context matching rpc {}", rpc);
+ }
if(rpcDef.getInput() == null) {
LOG.warn("found rpc definition's input is null");
return null;
List<Node<?>> dataNodes = XmlDocumentUtils.toDomNodes(xmlData,
Optional.of(rpcDef.getInput().getChildNodes()), schemaContext);
-
- LOG.debug("Converted xml input to list of nodes {}", dataNodes);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Converted xml input to list of nodes {}", dataNodes);
+ }
final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
it.setQName(rpc);
it.add(ImmutableCompositeNode.create(input, dataNodes));
} catch (IOException e) {
LOG.error("Error during building data tree from XML", e);
}
-
- LOG.debug("Xml to composite node conversion complete {} ", compositeNode);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Xml to composite node conversion complete {} ", compositeNode);
+ }
return compositeNode;
}
package org.opendaylight.controller.cluster.raft;
-option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_package = "org.opendaylight.controller.protobuff.messages.cluster.raft";
option java_outer_classname = "InstallSnapshotMessages";
option optimize_for = SPEED;
import javax.annotation.Nullable;
import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* MXBean implementation of the ThreadExecutorStatsMXBean interface that retrieves statistics
*/
public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
implements ThreadExecutorStatsMXBean {
-
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadExecutorStatsMXBeanImpl.class);
private final ThreadPoolExecutor executor;
/**
* @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
* @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
*/
- public ThreadExecutorStatsMXBeanImpl(Executor executor, String mBeanName,
- String mBeanType, @Nullable String mBeanCategory) {
+ public ThreadExecutorStatsMXBeanImpl(final ThreadPoolExecutor executor, final String mBeanName,
+ final String mBeanType, @Nullable final String mBeanCategory) {
super(mBeanName, mBeanType, mBeanCategory);
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
+ /**
+ * Create a new bean for the statistics, which is already registered.
+ *
+ * @param executor
+ * @param mBeanName
+ * @param mBeanType
+ * @param mBeanCategory
+ * @return
+ */
+ public static ThreadExecutorStatsMXBeanImpl create(final Executor executor, final String mBeanName,
+ final String mBeanType, @Nullable final String mBeanCategory) {
+ if (executor instanceof ThreadPoolExecutor) {
+ final ThreadExecutorStatsMXBeanImpl ret = new ThreadExecutorStatsMXBeanImpl((ThreadPoolExecutor) executor, mBeanName, mBeanType, mBeanCategory);
+ ret.registerMBean();
+ return ret;
+ }
- Preconditions.checkArgument(executor instanceof ThreadPoolExecutor,
- "The ExecutorService of type {} is not an instanceof ThreadPoolExecutor",
- executor.getClass());
- this.executor = (ThreadPoolExecutor)executor;
+ LOG.info("Executor {} is not supported", executor);
+ return null;
}
@Override
Preconditions.checkNotNull(path, "path should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
-
- LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+ }
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
DataChangeListener.props(listener ));
}, actorContext.getActorSystem().dispatcher());
return listenerRegistrationProxy;
}
-
- LOG.debug(
- "No local shard for shardName {} was found so returning a noop registration",
- shardName);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "No local shard for shardName {} was found so returning a noop registration",
+ shardName);
+ }
return new NoOpDataChangeListenerRegistration(listener);
}
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
}
@Override public void onReceiveRecover(Object message) {
- LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
- getSender());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveRecover: Received message {} from {}",
+ message.getClass().toString(),
+ getSender());
+ }
if (message instanceof RecoveryFailure){
LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
}
@Override public void onReceiveCommand(Object message) {
- LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
- getSender());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveCommand: Received message {} from {}",
+ message.getClass().toString(),
+ getSender());
+ }
if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
// This must be for install snapshot. Don't want to open this up and trigger
.tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
self());
+ createSnapshotTransaction = null;
// Send a PoisonPill instead of sending close transaction because we do not really need
// a response
getSender().tell(PoisonPill.getInstance(), self());
ShardTransactionIdentifier.builder()
.remoteTransactionId(remoteTransactionId)
.build();
- LOG.debug("Creating transaction : {} ", transactionId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Creating transaction : {} ", transactionId);
+ }
ActorRef transactionActor =
createTypedTransactionActor(transactionType, transactionId, transactionChainId);
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
- LOG.debug(
- "Could not find cohort for modification : {}. Writing modification using a new transaction",
- modification);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Could not find cohort for modification : {}. Writing modification using a new transaction",
+ modification);
+ }
+
DOMStoreWriteTransaction transaction =
store.newWriteOnlyTransaction();
- LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+ }
modification.apply(transaction);
try {
return;
}
- final ListenableFuture<Void> future = cohort.commit();
- final ActorRef self = getSelf();
+ ListenableFuture<Void> future = cohort.commit();
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {
- sender.tell(new CommitTransactionReply().toSerializable(), self);
+ sender.tell(new CommitTransactionReply().toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
}
public void onFailure(Throwable t) {
LOG.error(t, "An exception happened during commit");
shardMBean.incrementFailedTransactionsCount();
- sender.tell(new akka.actor.Status.Failure(t), self);
+ sender.tell(new akka.actor.Status.Failure(t), getSelf());
}
});
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener
- .getPath());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("registerDataChangeListener for {}", registerChangeListener
+ .getPath());
+ }
ActorSelection dataChangeListenerPath = getContext()
getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug(
- "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
- , listenerRegistration.path().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
+ , listenerRegistration.path().toString());
+ }
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
getSelf());
}
- private void createTransactionChain() {
- DOMStoreTransactionChain chain = store.createTransactionChain();
- ActorRef transactionChain = getContext().actorOf(
- ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
- getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
- getSelf());
- }
-
private boolean isMetricsCaptureEnabled(){
CommonConfig config = new CommonConfig(getContext().system().settings().config());
return config.isMetricCaptureEnabled();
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
+
+ LOG.info("Applying snapshot");
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
syncCommitTransaction(transaction);
} catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred when applying snapshot");
+ } finally {
+ LOG.info("Done applying snapshot");
}
}
.tell(new EnableNotification(isLeader()), getSelf());
}
-
shardMBean.setRaftState(getRaftState().name());
shardMBean.setCurrentTerm(getCurrentTerm());
// If this actor is no longer the leader close all the transaction chains
if(!isLeader()){
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ entry.getKey(), getId());
+ }
entry.getValue().close();
}
}
@Override protected void onLeaderChanged(String oldLeader, String newLeader) {
- if((oldLeader == null && newLeader == null) || (newLeader != null && newLeader.equals(oldLeader)) ){
- return;
- }
- LOG.info("Current state = {}, Leader = {}", getRaftState().name(), newLeader);
shardMBean.setLeader(newLeader);
}
peerAddress);
if(peerAddresses.containsKey(peerId)){
peerAddresses.put(peerId, peerAddress);
-
- LOG.debug(
- "Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+ }
actor
.tell(new PeerAddressResolved(peerId, peerAddress),
getSelf());
getSender().tell(new GetCompositeModificationReply(
new ImmutableCompositeModification(modification)), getSelf());
} else if (message instanceof ReceiveTimeout) {
- LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+ }
closeTransaction(false);
} else {
throw new UnknownMessageException(message);
protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
modification.addModification(
new WriteModification(message.getPath(), message.getData(),schemaContext));
- LOG.debug("writeData at path : " + message.getPath().toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("writeData at path : " + message.getPath().toString());
+ }
try {
transaction.write(message.getPath(), message.getData());
getSender().tell(new WriteDataReply().toSerializable(), getSelf());
protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
modification.addModification(
new MergeModification(message.getPath(), message.getData(), schemaContext));
- LOG.debug("mergeData at path : " + message.getPath().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("mergeData at path : " + message.getPath().toString());
+ }
try {
transaction.merge(message.getPath(), message.getData());
getSender().tell(new MergeDataReply().toSerializable(), getSelf());
}
protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
- LOG.debug("deleteData at path : " + message.getPath().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("deleteData at path : " + message.getPath().toString());
+ }
modification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
@Override public void onReceive(Object message) throws Exception {
if(message instanceof Terminated){
Terminated terminated = (Terminated) message;
- LOG.debug("Actor terminated : {}", terminated.actor());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ }
} else if(message instanceof Monitor){
Monitor monitor = (Monitor) message;
getContext().watch(monitor.getActorRef());
private void commit(CommitTransaction message) {
// Forward the commit to the shard
- log.debug("Forward commit transaction to Shard {} ", shardActor);
+ if(log.isDebugEnabled()) {
+ log.debug("Forward commit transaction to Shard {} ", shardActor);
+ }
shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
getContext());
@Override
public Void apply(Iterable<ActorPath> paths) {
cohortPaths = Lists.newArrayList(paths);
-
- LOG.debug("Tx {} successfully built cohort path list: {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} successfully built cohort path list: {}",
transactionId, cohortPaths);
+ }
return null;
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
@Override
public ListenableFuture<Boolean> canCommit() {
- LOG.debug("Tx {} canCommit", transactionId);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} canCommit", transactionId);
+ }
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
// The first phase of canCommit is to gather the list of cohort actor paths that will
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ }
returnFuture.setException(failure);
} else {
finishCanCommit(returnFuture);
}
private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
-
- LOG.debug("Tx {} finishCanCommit", transactionId);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finishCanCommit", transactionId);
+ }
// The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
// their canCommit processing. If any one fails then we'll fail canCommit.
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
+ }
returnFuture.setException(failure);
return;
}
return;
}
}
-
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+ }
returnFuture.set(Boolean.valueOf(result));
}
}, actorContext.getActorSystem().dispatcher());
private Future<Iterable<Object>> invokeCohorts(Object message) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
for(ActorPath actorPath : cohortPaths) {
-
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
+ }
ActorSelection cohort = actorContext.actorSelection(actorPath);
futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- LOG.debug("Tx {} {}", transactionId, operationName);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} {}", transactionId, operationName);
+ }
final SettableFuture<Void> returnFuture = SettableFuture.create();
// The cohort actor list should already be built at this point by the canCommit phase but,
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
operationName, failure);
-
+ }
if(propagateException) {
returnFuture.setException(failure);
} else {
private void finishVoidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException,
final SettableFuture<Void> returnFuture) {
-
- LOG.debug("Tx {} finish {}", transactionId, operationName);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+ }
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
}
if(exceptionToPropagate != null) {
- LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
-
+ }
if(propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
// Since the caller doesn't want us to propagate the exception we'll also
// not log it normally. But it's usually not good to totally silence
// exceptions so we'll log it to debug level.
- LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
exceptionToPropagate);
+ }
returnFuture.set(null);
}
} else {
- LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
+ }
returnFuture.set(null);
}
}
new TransactionProxyCleanupPhantomReference(this);
phantomReferenceCache.put(cleanup, cleanup);
}
-
- LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ }
}
@Override
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- LOG.debug("Tx {} read {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} read {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- LOG.debug("Tx {} exists {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} exists {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
return transactionContext(path).dataExists(path);
checkModificationState();
- LOG.debug("Tx {} write {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} write {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
transactionContext(path).writeData(path, data);
checkModificationState();
- LOG.debug("Tx {} merge {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} merge {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
transactionContext(path).mergeData(path, data);
public void delete(YangInstanceIdentifier path) {
checkModificationState();
-
- LOG.debug("Tx {} delete {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} delete {}", identifier, path);
+ }
createTransactionIfMissing(actorContext, path);
transactionContext(path).deleteData(path);
inReadyState = true;
- LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
-
+ }
List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("Tx {} Readying transaction for shard {}", identifier,
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
-
+ }
cohortPathFutures.add(transactionContext.readyTransaction());
}
String transactionPath = reply.getTransactionPath();
- LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
+ }
ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
if (transactionType == TransactionType.READ_ONLY) {
"Invalid reply type {} for CreateTransaction", response.getClass()));
}
} catch (Exception e) {
- LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ }
remoteTransactionPaths
.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
}
@Override
public void closeTransaction() {
- LOG.debug("Tx {} closeTransaction called", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} closeTransaction called", identifier);
+ }
actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
public Future<ActorPath> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
-
+ }
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
@Override
public ActorPath apply(Iterable<Object> notUsed) {
-
- LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
-
+ }
// At this point all the Futures succeeded and we need to extract the cohort
// actor path from the ReadyTransactionReply. For the recorded operations, they
// don't return any data so we're only interested that they completed
String resolvedCohortPath = getResolvedCohortPath(
reply.getCohortPath().toString());
- LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
identifier, resolvedCohortPath);
-
+ }
return actorContext.actorFor(resolvedCohortPath);
} else {
// Throwing an exception here will fail the Future.
@Override
public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ }
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
new DeleteData(path).toSerializable() ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ }
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
new MergeData(path, data, schemaContext).toSerializable()));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ }
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
new WriteData(path, data, schemaContext).toSerializable()));
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
+ }
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
// If there were any previous recorded put/merge/delete operation reply Futures then we
if(recordedOperationFutures.isEmpty()) {
finishReadData(path, returnFuture);
} else {
- LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
identifier, recordedOperationFutures.size());
-
+ }
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
public void onComplete(Throwable failure, Iterable<Object> notUsed)
throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
identifier, failure);
-
+ }
returnFuture.setException(new ReadFailedException(
"The read could not be performed because a previous put, merge,"
+ "or delete operation failed", failure));
private void finishReadData(final YangInstanceIdentifier path,
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
- LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+ }
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} read operation failed: {}", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+ }
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- LOG.debug("Tx {} read operation succeeded", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
+ }
if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
path, readResponse);
public CheckedFuture<Boolean, ReadFailedException> dataExists(
final YangInstanceIdentifier path) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ }
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
// If there were any previous recorded put/merge/delete operation reply Futures then we
if(recordedOperationFutures.isEmpty()) {
finishDataExists(path, returnFuture);
} else {
- LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
identifier, recordedOperationFutures.size());
-
+ }
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
public void onComplete(Throwable failure, Iterable<Object> notUsed)
throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
identifier, failure);
-
+ }
returnFuture.setException(new ReadFailedException(
"The data exists could not be performed because a previous "
+ "put, merge, or delete operation failed", failure));
private void finishDataExists(final YangInstanceIdentifier path,
final SettableFuture<Boolean> returnFuture) {
- LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+ }
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+ }
returnFuture.setException(new ReadFailedException(
"Error checking data exists for path " + path, failure));
} else {
- LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+ }
if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.
fromSerializable(response).exists()));
@Override
public void closeTransaction() {
- LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+ }
}
@Override
public Future<ActorPath> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readyTransaction called", identifier);
+ }
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ }
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ }
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ }
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
+ }
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
@Override
public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ }
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
}
public void setDataStoreExecutor(ExecutorService dsExecutor) {
- this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+ this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor,
"notification-executor", getMBeanType(), getMBeanCategory());
}
this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", getMBeanType(), getMBeanCategory());
- this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
"data-store-executor", getMBeanType(), getMBeanCategory());
}
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
- LOG.debug("Local shard found {}", found.getPath());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Local shard found {}", found.getPath());
+ }
return found.getPath();
}
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
- LOG.debug("Primary found {}", found.getPrimaryPath());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Primary found {}", found.getPrimaryPath());
+ }
return found.getPrimaryPath();
}
throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
*/
public Object executeRemoteOperation(ActorSelection actor, Object message) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
- actor.toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
+ actor.toString());
+ }
Future<Object> future = ask(actor, message, operationTimeout);
try {
*/
public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+ }
return ask(actor, message, operationTimeout);
}
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
+
+ deletePersistenceFiles();
}
@AfterClass
public static void tearDownClass() throws IOException {
JavaTestKit.shutdownActorSystem(system);
system = null;
+
+ deletePersistenceFiles();
}
protected static void deletePersistenceFiles() throws IOException {
subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
getRef());
- waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+ subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+ getRef());
+
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
}
};
- Thread.sleep(2000);
deletePersistenceFiles();
}};
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.collect.Iterables;
+import scala.concurrent.Future;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InMemorySnapshotStore extends SnapshotStore {
+
+ Map<String, List<Snapshot>> snapshots = new HashMap<>();
+
+ @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+ SnapshotSelectionCriteria snapshotSelectionCriteria) {
+ List<Snapshot> snapshotList = snapshots.get(s);
+ if(snapshotList == null){
+ return Futures.successful(Option.<SelectedSnapshot>none());
+ }
+
+ Snapshot snapshot = Iterables.getLast(snapshotList);
+ SelectedSnapshot selectedSnapshot =
+ new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
+ return Futures.successful(Option.some(selectedSnapshot));
+ }
+
+ @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+ List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+ if(snapshotList == null){
+ snapshotList = new ArrayList<>();
+ snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
+ }
+ snapshotList.add(new Snapshot(snapshotMetadata, o));
+
+ return Futures.successful(null);
+ }
+
+ @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+ }
+
+ @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+ List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+ if(snapshotList == null){
+ return;
+ }
+
+ int deleteIndex = -1;
+
+ for(int i=0;i<snapshotList.size(); i++){
+ Snapshot snapshot = snapshotList.get(i);
+ if(snapshotMetadata.equals(snapshot.getMetadata())){
+ deleteIndex = i;
+ break;
+ }
+ }
+
+ if(deleteIndex != -1){
+ snapshotList.remove(deleteIndex);
+ }
+
+ }
+
+ @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+ throws Exception {
+ List<Snapshot> snapshotList = snapshots.get(s);
+
+ if(snapshotList == null){
+ return;
+ }
+
+ // TODO : This is a quick and dirty implementation. Do actual match later.
+ snapshotList.clear();
+ snapshots.remove(s);
+ }
+
+ private static class Snapshot {
+ private final SnapshotMetadata metadata;
+ private final Object data;
+
+ private Snapshot(SnapshotMetadata metadata, Object data) {
+ this.metadata = metadata;
+ this.data = data;
+ }
+
+ public SnapshotMetadata getMetadata() {
+ return metadata;
+ }
+
+ public Object getData() {
+ return data;
+ }
+ }
+}
akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+
loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
actor {
}
}
}
+
+in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE);
commitStatsMXBean.registerMBean();
- final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean =
- new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats",
+ final AbstractMXBean commitExecutorStatsMXBean =
+ ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
JMX_BEAN_TYPE, null);
- commitExecutorStatsMXBean.registerMBean();
-
- final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean =
- new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor,
+ final AbstractMXBean commitFutureStatsMXBean =
+ ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor,
"CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
- commitFutureStatsMXBean.registerMBean();
newDataBroker.setCloseable(new AutoCloseable() {
@Override
public void close() {
commitStatsMXBean.unregisterMBean();
- commitExecutorStatsMXBean.unregisterMBean();
- commitFutureStatsMXBean.unregisterMBean();
+ if (commitExecutorStatsMXBean != null) {
+ commitExecutorStatsMXBean.unregisterMBean();
+ }
+ if (commitFutureStatsMXBean != null) {
+ commitFutureStatsMXBean.unregisterMBean();
+ }
}
});
out.print(prompt);
char c = 0;
byte data[] = new byte[1];
- while (c != '\n') {
+ while (!socket.isClosed() && socket.isConnected() && !socket.isInputShutdown() && c != '\n') {
try {
in.read(data);
c = (char) data[0];
inputString.append(c);
} catch (Exception err) {
err.printStackTrace(out);
+ stopped = true;
+ break;
}
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
- private final ListeningExecutorService listeningExecutor;
private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
private final ExecutorService dataChangeListenerExecutor;
-
- private final ExecutorService domStoreExecutor;
+ private final ListeningExecutorService commitExecutor;
private final boolean debugTransactions;
private final String name;
private volatile AutoCloseable closeable;
- public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
final ExecutorService dataChangeListenerExecutor) {
- this(name, domStoreExecutor, dataChangeListenerExecutor,
+ this(name, commitExecutor, dataChangeListenerExecutor,
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
}
- public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
final boolean debugTransactions) {
this.name = Preconditions.checkNotNull(name);
- this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
- this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
+ this.commitExecutor = Preconditions.checkNotNull(commitExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
this.debugTransactions = debugTransactions;
}
public ExecutorService getDomStoreExecutor() {
- return domStoreExecutor;
+ return commitExecutor;
}
@Override
@Override
public void close() {
- ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+ ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS);
ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
if(closeable != null) {
@Override
public ListenableFuture<Boolean> canCommit() {
- return listeningExecutor.submit(new Callable<Boolean>() {
+ return commitExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws TransactionCommitFailedException {
try {
@Override
public ListenableFuture<Void> preCommit() {
- return listeningExecutor.submit(new Callable<Void>() {
+ return commitExecutor.submit(new Callable<Void>() {
@Override
public Void call() {
candidate = dataTree.prepare(modification);
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
@Nullable final InMemoryDOMDataStoreConfigProperties properties) {
InMemoryDOMDataStoreConfigProperties actualProperties = properties;
- if(actualProperties == null) {
+ if (actualProperties == null) {
actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault();
}
// task execution time to get higher throughput as DataChangeListeners typically provide
// much of the business logic for a data model. If the executor queue size limit is reached,
// subsequent submitted notifications will block the calling thread.
-
int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize();
int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize();
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
- ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
- actualProperties.getMaxDataStoreExecutorQueueSize(), "DOMStore-" + name );
-
- InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
- domStoreExecutor, dataChangeListenerExecutor,
+ final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor();
+ final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
+ commitExecutor, dataChangeListenerExecutor,
actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
- if(schemaService != null) {
+ if (schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
}
package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
import java.util.concurrent.ExecutorService;
-
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
*/
public class InMemoryDataStoreStats implements AutoCloseable {
- private final ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
- private final ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+ private final AbstractMXBean notificationExecutorStatsBean;
+ private final AbstractMXBean dataStoreExecutorStatsBean;
private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
- public InMemoryDataStoreStats(String mBeanType, QueuedNotificationManager<?, ?> manager,
- ExecutorService dataStoreExecutor) {
+ public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager,
+ final ExecutorService dataStoreExecutor) {
- this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+ notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", mBeanType, null);
notificationManagerStatsBean.registerMBean();
- this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
"notification-executor", mBeanType, null);
- this.notificationExecutorStatsBean.registerMBean();
+ if (notificationExecutorStatsBean != null) {
+ notificationExecutorStatsBean.registerMBean();
+ }
- this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dataStoreExecutor,
+ dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor,
"data-store-executor", mBeanType, null);
- this.dataStoreExecutorStatsBean.registerMBean();
+ if (dataStoreExecutorStatsBean != null) {
+ dataStoreExecutorStatsBean.registerMBean();
+ }
}
@Override
Thread.currentThread().getContextClassLoader());
Config actorSystemConfig = config.get();
- LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+ }
if (config.isMetricCaptureEnabled()) {
LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
config.getActorSystemName());
* @param announcements
*/
private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
- LOG.debug("Announcing [{}]", announcements);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Announcing [{}]", announcements);
+ }
RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
}
* @param removals
*/
private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
- LOG.debug("Removing [{}]", removals);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing [{}]", removals);
+ }
RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
}
}
private void invokeRemoteRpc(final InvokeRpc msg) {
- LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
null, msg.getRpc(), msg.getIdentifier());
RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
}
private void executeRpc(final ExecuteRpc msg) {
- LOG.debug("Executing rpc {}", msg.getRpc());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ }
Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
schemaContext));
@Override
public void onRpcImplementationAdded(QName rpc) {
- LOG.debug("Adding registration for [{}]", rpc);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Adding registration for [{}]", rpc);
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
routeIds.add(routeId);
@Override
public void onRpcImplementationRemoved(QName rpc) {
- LOG.debug("Removing registration for [{}]", rpc);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing registration for [{}]", rpc);
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc, null);
List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
routeIds.add(routeId);
@Override public void onReceive(Object message) throws Exception {
if(message instanceof Terminated){
Terminated terminated = (Terminated) message;
- LOG.debug("Actor terminated : {}", terminated.actor());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ }
}else if(message instanceof Monitor){
Monitor monitor = (Monitor) message;
getContext().watch(monitor.getActorRef());
receiveUpdateRemoteBuckets(
((UpdateRemoteBuckets) message).getBuckets());
} else {
- log.debug("Unhandled message [{}]", message);
+ if(log.isDebugEnabled()) {
+ log.debug("Unhandled message [{}]", message);
+ }
unhandled(message);
}
}
versions.put(entry.getKey(), remoteVersion);
}
}
-
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ if(log.isDebugEnabled()) {
+ log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ }
}
///
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
if (!clusterMembers.contains(member.address()))
clusterMembers.add(member.address());
-
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
-
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ if(log.isDebugEnabled()) {
+ log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ }
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if(log.isDebugEnabled()) {
+ log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ }
return;
}
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.debug("Sending bucket versions to [{}]", remoteRef);
+ if(log.isDebugEnabled()) {
+ log.debug("Sending bucket versions to [{}]", remoteRef);
+ }
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ if(log.isDebugEnabled()) {
+ log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ }
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
-
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
- private final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
private final InstanceIdentifier<Topology> topology;
private final OperationProcessor processor;
- FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier<Topology> topology) {
+ FlowCapableTopologyExporter(final OperationProcessor processor,
+ final InstanceIdentifier<Topology> topology) {
this.processor = Preconditions.checkNotNull(processor);
this.topology = Preconditions.checkNotNull(topology);
}
processor.enqueueOperation(new TopologyOperation() {
@Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- removeAffectedLinks(nodeId);
+ public void applyOperation(ReadWriteTransaction transaction) {
+ removeAffectedLinks(nodeId, transaction);
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
}
- });
- processor.enqueueOperation(new TopologyOperation() {
@Override
- public void applyOperation(ReadWriteTransaction transaction) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+ public String toString() {
+ return "onNodeRemoved";
}
});
}
final InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
transaction.merge(LogicalDatastoreType.OPERATIONAL, path, node, true);
}
+
+ @Override
+ public String toString() {
+ return "onNodeUpdated";
+ }
});
}
}
@Override
public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
- final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
- .getNodeConnectorRef());
+ final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(
+ notification.getNodeConnectorRef());
- processor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- final TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
- removeAffectedLinks(tpId);
- }
- });
+ final TpId tpId = toTerminationPointId(getNodeConnectorKey(
+ notification.getNodeConnectorRef()).getId());
processor.enqueueOperation(new TopologyOperation() {
@Override
public void applyOperation(ReadWriteTransaction transaction) {
+ removeAffectedLinks(tpId, transaction);
transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
}
+
+ @Override
+ public String toString() {
+ return "onNodeConnectorRemoved";
+ }
});
}
@Override
public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) {
- final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
+ final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(
+ FlowCapableNodeConnectorUpdated.class);
if (fcncu != null) {
processor.enqueueOperation(new TopologyOperation() {
@Override
transaction.merge(LogicalDatastoreType.OPERATIONAL, path, point, true);
if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
|| (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
- removeAffectedLinks(point.getTpId());
+ removeAffectedLinks(point.getTpId(), transaction);
}
}
+
+ @Override
+ public String toString() {
+ return "onNodeConnectorUpdated";
+ }
});
}
}
final InstanceIdentifier<Link> path = linkPath(link);
transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
}
+
+ @Override
+ public String toString() {
+ return "onLinkDiscovered";
+ }
});
}
public void applyOperation(final ReadWriteTransaction transaction) {
transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
}
+
+ @Override
+ public String toString() {
+ return "onLinkRemoved";
+ }
});
}
return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
}
- private void removeAffectedLinks(final NodeId id) {
- processor.enqueueOperation(new TopologyOperation() {
+ private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
+ CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
+ transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+ Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
@Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
- Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
- @Override
- public void onSuccess(Optional<Topology> topologyOptional) {
- if (topologyOptional.isPresent()) {
- List<Link> linkList = topologyOptional.get().getLink() != null
- ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
- for (Link link : linkList) {
- if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
- }
- }
- }
- }
+ public void onSuccess(Optional<Topology> topologyOptional) {
+ removeAffectedLinks(id, topologyOptional);
+ }
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Error reading topology data for topology {}", topology, throwable);
- }
- });
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Error reading topology data for topology {}", topology, throwable);
}
});
}
- private void removeAffectedLinks(final TpId id) {
- processor.enqueueOperation(new TopologyOperation() {
- @Override
- public void applyOperation(final ReadWriteTransaction transaction) {
- CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
- Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
- @Override
- public void onSuccess(Optional<Topology> topologyOptional) {
- if (topologyOptional.isPresent()) {
- List<Link> linkList = topologyOptional.get().getLink() != null
- ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
- for (Link link : linkList) {
- if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
- transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
- }
- }
- }
- }
+ private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+ if (!topologyOptional.isPresent()) {
+ return;
+ }
+
+ List<Link> linkList = topologyOptional.get().getLink() != null ?
+ topologyOptional.get().getLink() : Collections.<Link> emptyList();
+ final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
+ for (Link link : linkList) {
+ if (id.equals(link.getSource().getSourceNode()) ||
+ id.equals(link.getDestination().getDestNode())) {
+ linkIDsToDelete.add(linkPath(link));
+ }
+ }
+
+ enqueueLinkDeletes(linkIDsToDelete);
+ }
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Error reading topology data for topology {}", topology, throwable);
+ private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
+ if(!linkIDsToDelete.isEmpty()) {
+ processor.enqueueOperation(new TopologyOperation() {
+ @Override
+ public void applyOperation(ReadWriteTransaction transaction) {
+ for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
}
- });
+ }
+
+ @Override
+ public String toString() {
+ return "Delete Links " + linkIDsToDelete.size();
+ }
+ });
+ }
+ }
+
+ private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
+ CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
+ transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+ Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+ @Override
+ public void onSuccess(Optional<Topology> topologyOptional) {
+ removeAffectedLinks(id, topologyOptional);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Error reading topology data for topology {}", topology, throwable);
}
});
}
+ private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+ if (!topologyOptional.isPresent()) {
+ return;
+ }
+
+ List<Link> linkList = topologyOptional.get().getLink() != null
+ ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
+ final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
+ for (Link link : linkList) {
+ if (id.equals(link.getSource().getSourceTp()) ||
+ id.equals(link.getDestination().getDestTp())) {
+ linkIDsToDelete.add(linkPath(link));
+ }
+ }
+
+ enqueueLinkDeletes(linkIDsToDelete);
+ }
+
private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
return topology.child(Node.class, new NodeKey(nodeId));
}
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
for (; ; ) {
TopologyOperation op = queue.take();
- LOG.debug("New operations available, starting transaction");
- final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+ LOG.debug("New {} operation available, starting transaction", op);
+ final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
int ops = 0;
do {
} else {
op = null;
}
+
+ LOG.debug("Next operation {}", op);
} while (op != null);
LOG.debug("Processed {} operations, submitting transaction", ops);
- final CheckedFuture txResultFuture = tx.submit();
- Futures.addCallback(txResultFuture, new FutureCallback() {
+ CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
+ Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
@Override
- public void onSuccess(Object o) {
+ public void onSuccess(Void notUsed) {
LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.md.controller.topology.manager;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscoveredBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortConfig;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.flow.capable.port.StateBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.LinkId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Destination;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.DestinationBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Source;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.SourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class FlowCapableTopologyExporterTest {
+
+ @Mock
+ private DataBroker mockDataBroker;
+
+ @Mock
+ private BindingTransactionChain mockTxChain;
+
+ private OperationProcessor processor;
+
+ private FlowCapableTopologyExporter exporter;
+
+ private InstanceIdentifier<Topology> topologyIID;
+
+ private final ExecutorService executor = Executors.newFixedThreadPool(1);
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(mockTxChain).when(mockDataBroker)
+ .createTransactionChain(any(TransactionChainListener.class));
+
+ processor = new OperationProcessor(mockDataBroker);
+
+ topologyIID = InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(new TopologyId("test")));
+ exporter = new FlowCapableTopologyExporter(processor, topologyIID);
+
+ executor.execute(processor);
+ }
+
+ @After
+ public void tearDown() {
+ executor.shutdownNow();
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Test
+ public void testOnNodeRemoved() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+ InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+ nodeKey);
+
+ List<Link> linkList = Arrays.asList(
+ newLink("link1", newSourceNode("node1"), newDestNode("dest")),
+ newLink("link2", newSourceNode("source"), newDestNode("node1")),
+ newLink("link2", newSourceNode("source2"), newDestNode("dest2")));
+ final Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+ InstanceIdentifier[] expDeletedIIDs = {
+ topologyIID.child(Link.class, linkList.get(0).getKey()),
+ topologyIID.child(Link.class, linkList.get(1).getKey()),
+ topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+ };
+
+ SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+ ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
+ doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
+ .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+
+ CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
+
+ int expDeleteCalls = expDeletedIIDs.length;
+ CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls);
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+ ArgumentCaptor.forClass(InstanceIdentifier.class);
+ setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
+
+ ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
+ setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
+ CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
+
+ doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
+
+ waitForSubmit(submitLatch1);
+
+ setReadFutureAsync(topology, readFuture);
+
+ waitForDeletes(expDeleteCalls, deleteLatch);
+
+ waitForSubmit(submitLatch2);
+
+ assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+
+ verifyMockTx(mockTx1);
+ verifyMockTx(mockTx2);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Test
+ public void testOnNodeRemovedWithNoTopology() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+ InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+ nodeKey);
+
+ InstanceIdentifier[] expDeletedIIDs = {
+ topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+ };
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx)
+ .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+
+ CountDownLatch deleteLatch = new CountDownLatch(1);
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+ ArgumentCaptor.forClass(InstanceIdentifier.class);
+ setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
+
+ waitForSubmit(submitLatch);
+
+ waitForDeletes(1, deleteLatch);
+
+ assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testOnNodeConnectorRemoved() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+ newInvNodeConnKey("tp1");
+
+ InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+ List<Link> linkList = Arrays.asList(
+ newLink("link1", newSourceTp("tp1"), newDestTp("dest")),
+ newLink("link2", newSourceTp("source"), newDestTp("tp1")),
+ newLink("link3", newSourceTp("source2"), newDestTp("dest2")));
+ final Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+ InstanceIdentifier[] expDeletedIIDs = {
+ topologyIID.child(Link.class, linkList.get(0).getKey()),
+ topologyIID.child(Link.class, linkList.get(1).getKey()),
+ topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+ .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1")))
+ };
+
+ final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+ ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
+ doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
+ .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+
+ CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
+
+ int expDeleteCalls = expDeletedIIDs.length;
+ CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls);
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+ ArgumentCaptor.forClass(InstanceIdentifier.class);
+ setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
+
+ ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
+ setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
+ CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
+
+ doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
+ new NodeConnectorRef(invNodeConnID)).build());
+
+ waitForSubmit(submitLatch1);
+
+ setReadFutureAsync(topology, readFuture);
+
+ waitForDeletes(expDeleteCalls, deleteLatch);
+
+ waitForSubmit(submitLatch2);
+
+ assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+
+ verifyMockTx(mockTx1);
+ verifyMockTx(mockTx2);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testOnNodeConnectorRemovedWithNoTopology() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+ newInvNodeConnKey("tp1");
+
+ InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+ InstanceIdentifier[] expDeletedIIDs = {
+ topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+ .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1")))
+ };
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx)
+ .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+
+ CountDownLatch deleteLatch = new CountDownLatch(1);
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+ ArgumentCaptor.forClass(InstanceIdentifier.class);
+ setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
+ new NodeConnectorRef(invNodeConnID)).build());
+
+ waitForSubmit(submitLatch);
+
+ waitForDeletes(1, deleteLatch);
+
+ assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+ }
+
+ @Test
+ public void testOnNodeUpdated() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+ InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+ nodeKey);
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeUpdated(new NodeUpdatedBuilder().setNodeRef(new NodeRef(invNodeID))
+ .setId(nodeKey.getId()).addAugmentation(FlowCapableNodeUpdated.class,
+ new FlowCapableNodeUpdatedBuilder().build()).build());
+
+ waitForSubmit(submitLatch);
+
+ ArgumentCaptor<Node> mergedNode = ArgumentCaptor.forClass(Node.class);
+ NodeId expNodeId = new NodeId("node1");
+ verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(Node.class,
+ new NodeKey(expNodeId))), mergedNode.capture(), eq(true));
+ assertEquals("getNodeId", expNodeId, mergedNode.getValue().getNodeId());
+ InventoryNode augmentation = mergedNode.getValue().getAugmentation(InventoryNode.class);
+ assertNotNull("Missing augmentation", augmentation);
+ assertEquals("getInventoryNodeRef", new NodeRef(invNodeID), augmentation.getInventoryNodeRef());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testOnNodeConnectorUpdated() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+ newInvNodeConnKey("tp1");
+
+ InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+ new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+ FlowCapableNodeConnectorUpdated.class,
+ new FlowCapableNodeConnectorUpdatedBuilder().build()).build());
+
+ waitForSubmit(submitLatch);
+
+ ArgumentCaptor<TerminationPoint> mergedNode = ArgumentCaptor.forClass(TerminationPoint.class);
+ NodeId expNodeId = new NodeId("node1");
+ TpId expTpId = new TpId("tp1");
+ InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+ Node.class, new NodeKey(expNodeId)).child(TerminationPoint.class,
+ new TerminationPointKey(expTpId));
+ verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+ mergedNode.capture(), eq(true));
+ assertEquals("getTpId", expTpId, mergedNode.getValue().getTpId());
+ InventoryNodeConnector augmentation = mergedNode.getValue().getAugmentation(
+ InventoryNodeConnector.class);
+ assertNotNull("Missing augmentation", augmentation);
+ assertEquals("getInventoryNodeConnectorRef", new NodeConnectorRef(invNodeConnID),
+ augmentation.getInventoryNodeConnectorRef());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testOnNodeConnectorUpdatedWithLinkStateDown() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+ newInvNodeConnKey("tp1");
+
+ InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+ List<Link> linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest")));
+ Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx)
+ .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+ setupStubbedSubmit(mockTx);
+
+ CountDownLatch deleteLatch = new CountDownLatch(1);
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+ ArgumentCaptor.forClass(InstanceIdentifier.class);
+ setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+ new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+ FlowCapableNodeConnectorUpdated.class,
+ new FlowCapableNodeConnectorUpdatedBuilder().setState(
+ new StateBuilder().setLinkDown(true).build()).build()).build());
+
+ waitForDeletes(1, deleteLatch);
+
+ InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+ Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class,
+ new TerminationPointKey(new TpId("tp1")));
+
+ verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+ any(TerminationPoint.class), eq(true));
+
+ assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class,
+ linkList.get(0).getKey())}, deletedLinkIDs);
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testOnNodeConnectorUpdatedWithPortDown() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ nodeKey = newInvNodeKey("node1");
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+ newInvNodeConnKey("tp1");
+
+ InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+ List<Link> linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest")));
+ Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx)
+ .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+ setupStubbedSubmit(mockTx);
+
+ CountDownLatch deleteLatch = new CountDownLatch(1);
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+ ArgumentCaptor.forClass(InstanceIdentifier.class);
+ setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+ new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+ FlowCapableNodeConnectorUpdated.class,
+ new FlowCapableNodeConnectorUpdatedBuilder().setConfiguration(
+ new PortConfig(true, true, true, true)).build()).build());
+
+ waitForDeletes(1, deleteLatch);
+
+ InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+ Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class,
+ new TerminationPointKey(new TpId("tp1")));
+
+ verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+ any(TerminationPoint.class), eq(true));
+
+ assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class,
+ linkList.get(0).getKey())}, deletedLinkIDs);
+ }
+
+ @Test
+ public void testOnLinkDiscovered() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ sourceNodeKey = newInvNodeKey("sourceNode");
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+ sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+ InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ destNodeKey = newInvNodeKey("destNode");
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+ destNodeConnKey = newInvNodeConnKey("destTP");
+ InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onLinkDiscovered(new LinkDiscoveredBuilder().setSource(
+ new NodeConnectorRef(sourceConnID)).setDestination(
+ new NodeConnectorRef(destConnID)).build());
+
+ waitForSubmit(submitLatch);
+
+ ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
+ verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+ Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+ mergedNode.capture(), eq(true));
+ assertEquals("Source node ID", "sourceNode",
+ mergedNode.getValue().getSource().getSourceNode().getValue());
+ assertEquals("Dest TP ID", "sourceTP",
+ mergedNode.getValue().getSource().getSourceTp().getValue());
+ assertEquals("Dest node ID", "destNode",
+ mergedNode.getValue().getDestination().getDestNode().getValue());
+ assertEquals("Dest TP ID", "destTP",
+ mergedNode.getValue().getDestination().getDestTp().getValue());
+ }
+
+ @Test
+ public void testOnLinkRemoved() {
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ sourceNodeKey = newInvNodeKey("sourceNode");
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+ sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+ InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ destNodeKey = newInvNodeKey("destNode");
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+ destNodeConnKey = newInvNodeConnKey("destTP");
+ InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+ ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+ CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+ doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+ exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+ new NodeConnectorRef(sourceConnID)).setDestination(
+ new NodeConnectorRef(destConnID)).build());
+
+ waitForSubmit(submitLatch);
+
+ verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+ Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+ }
+
+ private void verifyMockTx(ReadWriteTransaction mockTx) {
+ InOrder inOrder = inOrder(mockTx);
+ inOrder.verify(mockTx, atLeast(0)).submit();
+ inOrder.verify(mockTx, never()).delete(eq(LogicalDatastoreType.OPERATIONAL),
+ any(InstanceIdentifier.class));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void assertDeletedIDs(InstanceIdentifier[] expDeletedIIDs,
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs) {
+ Set<InstanceIdentifier> actualIIDs = new HashSet<>(deletedLinkIDs.getAllValues());
+ for(InstanceIdentifier id: expDeletedIIDs) {
+ assertTrue("Missing expected deleted IID " + id, actualIIDs.contains(id));
+ }
+ }
+
+ private void setReadFutureAsync(final Topology topology,
+ final SettableFuture<Optional<Topology>> readFuture) {
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ readFuture.set(Optional.of(topology));
+ }
+
+ }.start();
+ }
+
+ private void waitForSubmit(CountDownLatch latch) {
+ assertEquals("Transaction submitted", true,
+ Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
+ }
+
+ private void waitForDeletes(int expDeleteCalls, final CountDownLatch latch) {
+ boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
+ if(!done) {
+ fail("Expected " + expDeleteCalls + " delete calls. Actual: " +
+ (expDeleteCalls - latch.getCount()));
+ }
+ }
+
+ private CountDownLatch setupStubbedSubmit(ReadWriteTransaction mockTx) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(new Answer<CheckedFuture<Void, TransactionCommitFailedException>>() {
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> answer(
+ InvocationOnMock invocation) {
+ latch.countDown();
+ return Futures.immediateCheckedFuture(null);
+ }
+ }).when(mockTx).submit();
+
+ return latch;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void setupStubbedDeletes(ReadWriteTransaction mockTx,
+ ArgumentCaptor<InstanceIdentifier> deletedLinkIDs, final CountDownLatch latch) {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ latch.countDown();
+ return null;
+ }
+ }).when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), deletedLinkIDs.capture());
+ }
+
+ private org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+ newInvNodeKey(String id) {
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey =
+ new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey(
+ new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.
+ rev130819.NodeId(id));
+ return nodeKey;
+ }
+
+ private NodeConnectorKey newInvNodeConnKey(String id) {
+ return new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey(
+ new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.
+ NodeConnectorId(id));
+ }
+
+ private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> newNodeConnID(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey,
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey) {
+ return InstanceIdentifier.create(Nodes.class).child(
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+ nodeKey).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.
+ rev130819.node.NodeConnector.class, ncKey);
+ }
+
+ private Link newLink(String id, Source source, Destination dest) {
+ return new LinkBuilder().setLinkId(new LinkId(id))
+ .setSource(source).setDestination(dest).build();
+ }
+
+ private Destination newDestTp(String id) {
+ return new DestinationBuilder().setDestTp(new TpId(id)).build();
+ }
+
+ private Source newSourceTp(String id) {
+ return new SourceBuilder().setSourceTp(new TpId(id)).build();
+ }
+
+ private Destination newDestNode(String id) {
+ return new DestinationBuilder().setDestNode(new NodeId(id)).build();
+ }
+
+ private Source newSourceNode(String id) {
+ return new SourceBuilder().setSourceNode(new NodeId(id)).build();
+ }
+}