From: Moiz Raja Date: Mon, 9 Feb 2015 18:11:26 +0000 (+0000) Subject: Merge "Bug 2410: Use generated serialVersionUID for messages" X-Git-Tag: release/lithium~609 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=664fc9940569ebfd54dcfc3db87bab66fad9300e;hp=ce51608c403fd3ca5989afb4a4667e125f722fb4 Merge "Bug 2410: Use generated serialVersionUID for messages" --- diff --git a/karaf/karaf-parent/pom.xml b/karaf/karaf-parent/pom.xml index 06d8c8d99b..baf67302e0 100644 --- a/karaf/karaf-parent/pom.xml +++ b/karaf/karaf-parent/pom.xml @@ -19,9 +19,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html karaf-parent ${project.artifactId} pom - - 3.1.1 - + 1.1.0-SNAPSHOT 1.5.0-SNAPSHOT @@ -320,6 +318,26 @@ and is available at http://www.eclipse.org/legal/epl-v10.html + + org.apache.maven.plugins + maven-enforcer-plugin + ${enforcer.version} + + + enforce-maven + + enforce + + + + + 3.1.1 + + + + + + maven-resources-plugin diff --git a/karaf/opendaylight-karaf-empty/pom.xml b/karaf/opendaylight-karaf-empty/pom.xml index a13023cbee..aa772096cd 100644 --- a/karaf/opendaylight-karaf-empty/pom.xml +++ b/karaf/opendaylight-karaf-empty/pom.xml @@ -9,9 +9,6 @@ opendaylight-karaf-empty pom - - 3.0 - diff --git a/karaf/opendaylight-karaf/pom.xml b/karaf/opendaylight-karaf/pom.xml index 9bf2309b6f..3b29fa276e 100644 --- a/karaf/opendaylight-karaf/pom.xml +++ b/karaf/opendaylight-karaf/pom.xml @@ -9,9 +9,6 @@ distribution.opendaylight-karaf pom - - 3.0 - diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml index 67cc60ab0f..49b43f442e 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml @@ -13,6 +13,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL org.opendaylight.odlparent features-parent 1.5.0-SNAPSHOT + ${groupId} ${artifactId}-features diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-impl/src/main/config/default-config.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-impl/src/main/config/default-config.xml index db4efb83e3..e777fd25bc 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-impl/src/main/config/default-config.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-impl/src/main/config/default-config.xml @@ -12,6 +12,8 @@ and is available at http://www.eclipse.org/legal/epl-v10.html --> + urn:opendaylight:params:xml:ns:yang:${artifactId}:impl?module=${artifactId}-impl&revision=2014-12-10 + urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&revision=2013-10-28 diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml index ae209c1bbd..3221efd362 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml @@ -22,4 +22,23 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL ${artifactId}-features ${artifactId}-artifacts + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.apache.maven.plugins + maven-install-plugin + + true + + + + diff --git a/opendaylight/config/config-plugin-parent/pom.xml b/opendaylight/config/config-plugin-parent/pom.xml index 67370e1e2f..081df0c52d 100644 --- a/opendaylight/config/config-plugin-parent/pom.xml +++ b/opendaylight/config/config-plugin-parent/pom.xml @@ -11,9 +11,6 @@ config-plugin-parent pom ${project.artifactId} - - 3.0.4 - diff --git a/opendaylight/config/logback-config-loader/pom.xml b/opendaylight/config/logback-config-loader/pom.xml index 0f379fbe21..94f7f8fc49 100644 --- a/opendaylight/config/logback-config-loader/pom.xml +++ b/opendaylight/config/logback-config-loader/pom.xml @@ -11,9 +11,6 @@ logback-config-loader bundle ${project.artifactId} - - 3.0.4 - diff --git a/opendaylight/config/logback-config/pom.xml b/opendaylight/config/logback-config/pom.xml index d918fd7ab7..d4537387aa 100644 --- a/opendaylight/config/logback-config/pom.xml +++ b/opendaylight/config/logback-config/pom.xml @@ -11,9 +11,6 @@ logback-config bundle ${project.artifactId} - - 3.0.4 - diff --git a/opendaylight/config/netty-config-api/pom.xml b/opendaylight/config/netty-config-api/pom.xml index a5c0831fb8..2d8145723d 100644 --- a/opendaylight/config/netty-config-api/pom.xml +++ b/opendaylight/config/netty-config-api/pom.xml @@ -11,9 +11,6 @@ netty-config-api bundle ${project.artifactId} - - 3.0.4 - diff --git a/opendaylight/config/netty-event-executor-config/pom.xml b/opendaylight/config/netty-event-executor-config/pom.xml index 6188aed898..31940b91fe 100644 --- a/opendaylight/config/netty-event-executor-config/pom.xml +++ b/opendaylight/config/netty-event-executor-config/pom.xml @@ -12,9 +12,6 @@ bundle ${project.artifactId} Configuration Wrapper around netty's event executor - - 3.0.4 - diff --git a/opendaylight/config/netty-threadgroup-config/pom.xml b/opendaylight/config/netty-threadgroup-config/pom.xml index 2f3d26dd2f..0f645015e1 100644 --- a/opendaylight/config/netty-threadgroup-config/pom.xml +++ b/opendaylight/config/netty-threadgroup-config/pom.xml @@ -14,9 +14,6 @@ bundle ${project.artifactId} Configuration Wrapper around netty's event group - - 3.0.4 - diff --git a/opendaylight/config/netty-timer-config/pom.xml b/opendaylight/config/netty-timer-config/pom.xml index 75b4709da2..181c1d0151 100644 --- a/opendaylight/config/netty-timer-config/pom.xml +++ b/opendaylight/config/netty-timer-config/pom.xml @@ -12,9 +12,6 @@ bundle ${project.artifactId} Configuration Wrapper around netty's timer - - 3.0.4 - diff --git a/opendaylight/config/pom.xml b/opendaylight/config/pom.xml index 4c4c5b3378..6d14ad3957 100644 --- a/opendaylight/config/pom.xml +++ b/opendaylight/config/pom.xml @@ -13,9 +13,7 @@ 0.3.0-SNAPSHOT pom ${project.artifactId} - - 3.0.4 - + config-api config-manager diff --git a/opendaylight/config/threadpool-config-api/pom.xml b/opendaylight/config/threadpool-config-api/pom.xml index 5f0c941a19..9dc7bf5976 100644 --- a/opendaylight/config/threadpool-config-api/pom.xml +++ b/opendaylight/config/threadpool-config-api/pom.xml @@ -11,9 +11,6 @@ threadpool-config-api bundle ${project.artifactId} - - 3.0.4 - diff --git a/opendaylight/config/threadpool-config-impl/pom.xml b/opendaylight/config/threadpool-config-impl/pom.xml index 2787b30df4..b875f5f3e4 100644 --- a/opendaylight/config/threadpool-config-impl/pom.xml +++ b/opendaylight/config/threadpool-config-impl/pom.xml @@ -11,9 +11,6 @@ threadpool-config-impl bundle ${project.artifactId} - - 3.0.4 - diff --git a/opendaylight/config/yang-test-plugin/pom.xml b/opendaylight/config/yang-test-plugin/pom.xml index d03cff305b..690f8d24e6 100644 --- a/opendaylight/config/yang-test-plugin/pom.xml +++ b/opendaylight/config/yang-test-plugin/pom.xml @@ -12,9 +12,6 @@ ${project.artifactId} Remove generated source files, after new files generation, implementation is inserted. - - 3.0.4 - diff --git a/opendaylight/config/yang-test/pom.xml b/opendaylight/config/yang-test/pom.xml index 5977325574..f5c966d5a6 100644 --- a/opendaylight/config/yang-test/pom.xml +++ b/opendaylight/config/yang-test/pom.xml @@ -14,9 +14,6 @@ ${project.artifactId} Artifact that contains only generated code from yang files. Suitable for testing. - - 3.0.4 - diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index c256c822a4..766b80e73d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private CaptureSnapshot captureSnapshot = null; - private volatile boolean hasSnapshotCaptureInitiated = false; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; - - public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -179,7 +175,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredSnapshot(SnapshotOffer offer) { if(LOG.isDebugEnabled()) { - LOG.debug("SnapshotOffer called.."); + LOG.debug("{}: SnapshotOffer called..", persistenceId()); } initRecoveryTimer(); @@ -209,7 +205,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { if(LOG.isDebugEnabled()) { - LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex()); + LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex()); } replicatedLog.append(logEntry); @@ -217,8 +213,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredApplyLogEntries(ApplyLogEntries ale) { if(LOG.isDebugEnabled()) { - LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getLastApplied() + 1, ale.getToIndex()); + LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", + persistenceId(), context.getLastApplied() + 1, ale.getToIndex()); } for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { @@ -289,8 +285,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { ApplyState applyState = (ApplyState) message; if(LOG.isDebugEnabled()) { - LOG.debug("Applying state for log index {} data {}", - applyState.getReplicatedLogEntry().getIndex(), + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), applyState.getReplicatedLogEntry().getData()); } @@ -300,7 +296,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof ApplyLogEntries){ ApplyLogEntries ale = (ApplyLogEntries) message; if(LOG.isDebugEnabled()) { - LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex()); + LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex()); } persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { @Override @@ -312,8 +308,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); if(LOG.isDebugEnabled()) { - LOG.debug("ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + LOG.debug("{}: ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm() ); } @@ -333,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; - LOG.info("SaveSnapshotSuccess received for snapshot"); + LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId()); long sequenceNumber = success.metadata().sequenceNr(); @@ -342,19 +338,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); - LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); + LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:", + persistenceId()); context.getReplicatedLog().snapshotRollback(); - LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), context.getReplicatedLog().size()); } else if (message instanceof CaptureSnapshot) { - LOG.info("CaptureSnapshot received by actor"); + LOG.info("{}: CaptureSnapshot received by actor", persistenceId()); if(captureSnapshot == null) { captureSnapshot = (CaptureSnapshot)message; @@ -368,7 +364,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!(message instanceof AppendEntriesMessages.AppendEntries) && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveCommand: message: {}", message.getClass()); + LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass()); } } @@ -414,7 +410,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getTermInformation().getCurrentTerm(), data); if(LOG.isDebugEnabled()) { - LOG.debug("Persist data {}", replicatedLogEntry); + LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); } final RaftActorContext raftContext = getRaftActorContext(); @@ -436,12 +432,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self()); // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!hasSnapshotCaptureInitiated){ + if(!context.isSnapshotCaptureInitiated()){ raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), raftContext.getTermInformation().getCurrentTerm()); raftContext.getReplicatedLog().snapshotCommit(); } else { - LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId()); + LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress", + persistenceId(), getId()); } } else if (clientActor != null) { // Send message for replication @@ -652,15 +649,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } String peerAddress = context.getPeerAddress(leaderId); if(LOG.isDebugEnabled()) { - LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}", - leaderId, peerAddress); + LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", + persistenceId(), leaderId, peerAddress); } return peerAddress; } private void handleCaptureSnapshotReply(byte[] snapshotBytes) { - LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length); + LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. @@ -672,7 +669,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { persistence().saveSnapshot(sn); - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); //be greedy and remove entries from in-mem journal which are in the snapshot // and update snapshotIndex and snapshotTerm without waiting for the success, @@ -681,8 +678,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + - "and term:{}", captureSnapshot.getLastAppliedIndex(), + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { @@ -692,7 +689,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } captureSnapshot = null; - hasSnapshotCaptureInitiated = false; + context.setSnapshotCaptureInitiated(false); } protected boolean hasFollowers(){ @@ -751,7 +748,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final Procedure callback) { if(LOG.isDebugEnabled()) { - LOG.debug("Append log entry and persist {} ", replicatedLogEntry); + LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry); } // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs @@ -793,13 +790,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; // when a snaphsot is being taken, captureSnapshot != null - if (hasSnapshotCaptureInitiated == false && + if (!context.isSnapshotCaptureInitiated() && ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || dataSizeForCheck > dataThreshold)) { dataSizeSinceLastSnapshot = 0; - LOG.info("Initiating Snapshot Capture.."); + LOG.info("{}: Initiating Snapshot Capture..", persistenceId()); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -813,18 +810,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot Capture logSize: {}", journal.size()); - LOG.debug("Snapshot Capture lastApplied:{} ", - context.getLastApplied()); - LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); - LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size()); + LOG.debug("{}: Snapshot Capture lastApplied:{} ", + persistenceId(), context.getLastApplied()); + LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(), + lastAppliedIndex); + LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(), + lastAppliedTerm); } // send a CaptureSnapshot to self to make the expensive operation async. getSelf().tell(new CaptureSnapshot( lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), null); - hasSnapshotCaptureInitiated = true; + context.setSnapshotCaptureInitiated(true); } if(callback != null){ callback.apply(replicatedLogEntry); @@ -869,7 +868,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void update(long currentTerm, String votedFor) { if(LOG.isDebugEnabled()) { - LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor); } this.currentTerm = currentTerm; this.votedFor = votedFor; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 0eb4b73779..0e1f20b246 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -89,7 +89,7 @@ public interface RaftActorContext { * * @param replicatedLog */ - public void setReplicatedLog(ReplicatedLog replicatedLog); + void setReplicatedLog(ReplicatedLog replicatedLog); /** * @return A representation of the log @@ -137,7 +137,7 @@ public interface RaftActorContext { * * @param name */ - public void removePeer(String name); + void removePeer(String name); /** * Given a peerId return the corresponding actor @@ -165,5 +165,10 @@ public interface RaftActorContext { /** * @return ConfigParams */ - public ConfigParams getConfigParams(); + ConfigParams getConfigParams(); + + void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated); + + boolean isSnapshotCaptureInitiated(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index e4aef0a844..5438fe7c48 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -14,7 +14,6 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActorContext; import akka.event.LoggingAdapter; - import java.util.Map; import static com.google.common.base.Preconditions.checkState; @@ -41,6 +40,8 @@ public class RaftActorContextImpl implements RaftActorContext { private final ConfigParams configParams; + private boolean snapshotCaptureInitiated; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, @@ -130,6 +131,16 @@ public class RaftActorContextImpl implements RaftActorContext { return configParams; } + @Override + public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { + this.snapshotCaptureInitiated = snapshotCaptureInitiated; + } + + @Override + public boolean isSnapshotCaptureInitiated() { + return snapshotCaptureInitiated; + } + @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 462c94ec8a..410dcee5e5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; + private long replicatedToAllIndex = -1; + public AbstractLeader(RaftActorContext context) { super(context); @@ -109,7 +111,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { leaderId = context.getId(); - LOG.debug("Election:Leader has following peers: {}", getFollowerIds()); + LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds()); minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); @@ -153,7 +155,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); + LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); } return this; @@ -165,7 +167,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(! appendEntriesReply.isSuccess()) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntriesReply.toString()); + LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply); } } @@ -175,7 +177,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("Unknown follower {}", followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId); return this; } @@ -226,9 +228,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } + if (!context.isSnapshotCaptureInitiated()) { + purgeInMemoryLog(); + } + return this; } + private void purgeInMemoryLog() { + //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + // we would delete the in-mem log from that index on, in-order to minimize mem usage + // we would also share this info thru AE with the followers so that they can delete their log entries as well. + long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + for (FollowerLogInformation info : followerToLog.values()) { + minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); + } + + replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + } + @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { final Iterator it = trackerList.iterator(); @@ -276,6 +294,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), + rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); return switchBehavior(new Follower(context)); @@ -312,19 +333,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot == null) { + LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + context.getId(), followerId); + return; + } + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); followerLogInformation.markFollowerActive(); - if (followerToSnapshot != null && - followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { - + if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { - LOG.debug("InstallSnapshotReply received, " + + LOG.debug("{}: InstallSnapshotReply received, " + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, + context.getId(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } @@ -336,8 +362,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.remove(followerId); if(LOG.isDebugEnabled()) { - LOG.debug("followerToLog.get(followerId).getNextIndex()=" + - followerToLog.get(followerId).getNextIndex()); + LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" + + context.getId(), followerToLog.get(followerId).getNextIndex()); } if (mapFollowerToSnapshot.isEmpty()) { @@ -350,20 +376,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(true); } } else { - LOG.info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex() - ); + LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", + context.getId(), reply.getChunkIndex()); followerToSnapshot.markSendStatus(false); } - } else { - LOG.error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() - ); + LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + context.getId(), reply.getChunkIndex(), followerId, + followerToSnapshot.getChunkIndex()); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot @@ -377,7 +398,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long logIndex = replicate.getReplicatedLogEntry().getIndex(); if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message {}", logIndex); + LOG.debug("{}: Replicate message {}", context.getId(), logIndex); } // Create a tracker entry we will use this later to notify the @@ -398,6 +419,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers + for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -407,14 +429,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); - if (mapFollowerToSnapshot.get(followerId) != null) { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + if (isFollowerActive && followerToSnapshot.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); } else { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList()); + Collections.emptyList(), followerId); } } else { @@ -422,8 +445,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); final List entries; - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { + LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + context.getId(), leaderLastIndex, leaderSnapShotIndex); + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(), + followerNextIndex, followerId); + // FIXME : Sending one entry at a time entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); @@ -434,11 +462,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if(LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + "follower-nextIndex: %s, leader-snapshot-index: %s, " + + "leader-last-index: %s", context.getId(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } actor().tell(new InitiateInstallSnapshot(), actor()); @@ -451,22 +478,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { entries = Collections.emptyList(); } - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); - + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } } } } private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries) { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); + List entries, String followerId) { + AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex(), replicatedToAllIndex); + + if(!entries.isEmpty()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + appendEntries); + } + + followerActor.tell(appendEntries.toSerializable(), actor()); } /** @@ -486,6 +516,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * */ private void installSnapshotIfNeeded() { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet()); + } + for (Entry e : followerToLog.entrySet()) { final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); @@ -493,8 +527,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{} follower needs a snapshot install", e.getKey()); + context.getReplicatedLog().isInSnapshot(nextIndex)) { + LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot @@ -516,7 +550,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // on every install snapshot, we try to capture the snapshot. // Once a capture is going on, another one issued will get ignored by RaftActor. private void initiateCaptureSnapshot() { - LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); + LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -558,23 +592,30 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + + // Note: the previous call to getNextSnapshotChunk has the side-effect of adding + // followerId to the followerToSnapshot map. + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId,snapshot.get()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks(), - Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode()) + nextSnapshotChunk, + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), + Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() ); - LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + context.getId(), followerActor.path(), + followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "InstallSnapshot failed for Leader."); + LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); } } @@ -590,7 +631,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } ByteString nextChunk = followerToSnapshot.getNextChunk(); if (LOG.isDebugEnabled()) { - LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size()); } return nextChunk; } @@ -654,14 +695,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * snapshot chunks */ protected class FollowerToSnapshot { - private ByteString snapshotBytes; + private final ByteString snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset; // if replyStatus is false, the previous chunk is attempted private boolean replyStatus = false; private int chunkIndex; - private int totalChunks; + private final int totalChunks; private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; @@ -671,8 +712,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); + LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", + context.getId(), size, totalChunks); } replyReceivedForOffset = -1; chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; @@ -741,7 +782,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - LOG.debug("length={}, offset={},size={}", + LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(), snapshotLength, start, size); } ByteString substring = getSnapshotBytes().substring(start, start + size); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 04462be042..99824b0bb4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -94,8 +94,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { if(LOG.isDebugEnabled()) { - LOG.debug("Cannot append entries because sender term {} is less than {}", - appendEntries.getTerm(), currentTerm()); + LOG.debug("{}: Cannot append entries because sender term {} is less than {}", + context.getId(), appendEntries.getTerm(), currentTerm()); } sender.tell( @@ -136,7 +136,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { RequestVote requestVote) { if(LOG.isDebugEnabled()) { - LOG.debug(requestVote.toString()); + LOG.debug("{}: Received {}", context.getId(), requestVote); } boolean grantVote = false; @@ -350,12 +350,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { //if one index is not present in the log, no point in looping // around as the rest wont be present either LOG.warning( - "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index); + "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + context.getId(), i, i, index); break; } } if(LOG.isDebugEnabled()) { - LOG.debug("Setting last applied to {}", newLastApplied); + LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied); } context.setLastApplied(newLastApplied); @@ -393,7 +394,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { try { close(); } catch (Exception e) { - LOG.error(e, "Failed to close behavior : {}", this.state()); + LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state()); } return behavior; @@ -421,4 +422,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return numMajority; } + + protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) { + + // we would want to keep the lastApplied as its used while capturing snapshots + long tempMin = Math.min(minReplicatedToAllIndex, + (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1)); + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm()); + context.getReplicatedLog().snapshotCommit(); + return tempMin; + } + return currentReplicatedIndex; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 702417273f..09ffe056c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import java.util.Set; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import java.util.Set; - /** * The behavior of a RaftActor when it is in the CandidateState *

@@ -53,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior { peers = context.getPeerAddresses().keySet(); if(LOG.isDebugEnabled()) { - LOG.debug("Election:Candidate has following peers: {}", peers); + LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers); } votesRequired = getMajorityVoteCount(peers.size()); @@ -66,7 +65,7 @@ public class Candidate extends AbstractRaftActorBehavior { AppendEntries appendEntries) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); + LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); } return this; @@ -106,7 +105,8 @@ public class Candidate extends AbstractRaftActorBehavior { RaftRPC rpc = (RaftRPC) message; if(LOG.isDebugEnabled()) { - LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm()); + LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc, + context.getTermInformation().getCurrentTerm()); } // If RPC request or response contains term T > currentTerm: @@ -150,7 +150,7 @@ public class Candidate extends AbstractRaftActorBehavior { context.getId()); if(LOG.isDebugEnabled()) { - LOG.debug("Starting new term {}", (currentTerm + 1)); + LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1)); } // Request for a vote diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index cc2e55d51b..410b3c266c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -77,7 +77,7 @@ public class Follower extends AbstractRaftActorBehavior { if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); + LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); } } @@ -109,8 +109,8 @@ public class Follower extends AbstractRaftActorBehavior { // it's log. if(LOG.isDebugEnabled()) { - LOG.debug("The followers log is empty and the senders prevLogIndex is {}", - appendEntries.getPrevLogIndex()); + LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}", + context.getId(), appendEntries.getPrevLogIndex()); } } else if (lastIndex() > -1 @@ -121,8 +121,8 @@ public class Follower extends AbstractRaftActorBehavior { // prevLogIndex entry was not found in it's log if(LOG.isDebugEnabled()) { - LOG.debug("The log is not empty but the prevLogIndex {} was not found in it", - appendEntries.getPrevLogIndex()); + LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it", + context.getId(), appendEntries.getPrevLogIndex()); } } else if (lastIndex() > -1 @@ -135,8 +135,8 @@ public class Follower extends AbstractRaftActorBehavior { if (LOG.isDebugEnabled()) { LOG.debug( - "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" - , prevLogTerm + "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" + , context.getId(), prevLogTerm , appendEntries.getPrevLogTerm()); } } else { @@ -147,9 +147,9 @@ public class Follower extends AbstractRaftActorBehavior { // We found that the log was out of sync so just send a negative // reply and return if(LOG.isDebugEnabled()) { - LOG.debug("Follower ({}) is out-of-sync, " + + LOG.debug("{}: Follower ({}) is out-of-sync, " + "so sending negative reply, lastIndex():{}, lastTerm():{}", - context.getId(), lastIndex(), lastTerm() + context.getId(), context.getId(), lastIndex(), lastTerm() ); } sender.tell( @@ -162,9 +162,8 @@ public class Follower extends AbstractRaftActorBehavior { if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { if(LOG.isDebugEnabled()) { - LOG.debug( - "Number of entries to be appended = {}", appendEntries.getEntries().size() - ); + LOG.debug("{}: Number of entries to be appended = {}", context.getId(), + appendEntries.getEntries().size()); } // 3. If an existing entry conflicts with a new one (same index @@ -189,9 +188,8 @@ public class Follower extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - LOG.debug( - "Removing entries from log starting at {}", matchEntry.getIndex() - ); + LOG.debug("{}: Removing entries from log starting at {}", context.getId(), + matchEntry.getIndex()); } // Entries do not match so remove all subsequent entries @@ -202,8 +200,8 @@ public class Follower extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex()) - ); + LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(), + (addEntriesFrom + lastIndex())); } // 4. Append any new entries not already in the log @@ -211,13 +209,14 @@ public class Follower extends AbstractRaftActorBehavior { i < appendEntries.getEntries().size(); i++) { if(LOG.isDebugEnabled()) { - LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData()); + LOG.debug("{}: Append entry to log {}", context.getId(), + appendEntries.getEntries().get(i).getData()); } context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i)); } if(LOG.isDebugEnabled()) { - LOG.debug("Log size is now {}", context.getReplicatedLog().size()); + LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size()); } } @@ -232,7 +231,7 @@ public class Follower extends AbstractRaftActorBehavior { if (prevCommitIndex != context.getCommitIndex()) { if(LOG.isDebugEnabled()) { - LOG.debug("Commit index set to {}", context.getCommitIndex()); + LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex()); } } @@ -242,9 +241,9 @@ public class Follower extends AbstractRaftActorBehavior { if (appendEntries.getLeaderCommit() > context.getLastApplied() && context.getLastApplied() < lastIndex()) { if(LOG.isDebugEnabled()) { - LOG.debug("applyLogToStateMachine, " + + LOG.debug("{}: applyLogToStateMachine, " + "appendEntries.getLeaderCommit():{}," + - "context.getLastApplied():{}, lastIndex():{}", + "context.getLastApplied():{}, lastIndex():{}", context.getId(), appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex() ); } @@ -255,6 +254,10 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm()), actor()); + if (!context.isSnapshotCaptureInitiated()) { + fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex()); + } + return this; } @@ -302,8 +305,8 @@ public class Follower extends AbstractRaftActorBehavior { private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { if(LOG.isDebugEnabled()) { - LOG.debug("InstallSnapshot received by follower " + - "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(), + LOG.debug("{}: InstallSnapshot received by follower " + + "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(), installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks() ); } @@ -339,8 +342,7 @@ public class Follower extends AbstractRaftActorBehavior { snapshotTracker = null; } catch (Exception e){ - - LOG.error(e, "Exception in InstallSnapshot of follower:"); + LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId()); //send reply with success as false. The chunk will be sent again on failure sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), installSnapshot.getChunkIndex(), false), actor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index ee3cc65ddd..fcfaee3603 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -57,8 +57,8 @@ public class Leader extends AbstractLeader { if (originalMessage instanceof IsolatedLeaderCheck) { if (isLeaderIsolated()) { - LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", - minIsolatedLeaderPeerCount, leaderId); + LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", + context.getId(), minIsolatedLeaderPeerCount, leaderId); return switchBehavior(new IsolatedLeader(context)); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 8198106217..97bcd6a708 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -50,14 +50,18 @@ public class AppendEntries extends AbstractRaftRPC { // leader's commitIndex private final long leaderCommit; + // index which has been replicated successfully to all followers, -1 if none + private final long replicatedToAllIndex; + public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit) { + long prevLogTerm, List entries, long leaderCommit, long replicatedToAllIndex) { super(term); this.leaderId = leaderId; this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; this.entries = entries; this.leaderCommit = leaderCommit; + this.replicatedToAllIndex = replicatedToAllIndex; } private void writeObject(ObjectOutputStream out) throws IOException { @@ -102,6 +106,10 @@ public class AppendEntries extends AbstractRaftRPC { return leaderCommit; } + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + @Override public String toString() { final StringBuilder sb = @@ -112,6 +120,7 @@ public class AppendEntries extends AbstractRaftRPC { sb.append(", prevLogTerm=").append(prevLogTerm); sb.append(", entries=").append(entries); sb.append(", leaderCommit=").append(leaderCommit); + sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex); sb.append('}'); return sb.toString(); } @@ -203,7 +212,7 @@ public class AppendEntries extends AbstractRaftRPC { from.getPrevLogIndex(), from.getPrevLogTerm(), logEntryList, - from.getLeaderCommit()); + from.getLeaderCommit(), -1); return to; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index d53ccf2500..ffd8edfbe1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -128,6 +128,33 @@ public class AbstractReplicatedLogImplTest { } + @Test + public void testSnapshotPreCommit() { + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H"))); + + replicatedLogImpl.snapshotPreCommit(4, 3); + assertEquals(3, replicatedLogImpl.size()); + assertEquals(4, replicatedLogImpl.getSnapshotIndex()); + + replicatedLogImpl.snapshotPreCommit(6, 3); + assertEquals(1, replicatedLogImpl.size()); + assertEquals(6, replicatedLogImpl.getSnapshotIndex()); + + replicatedLogImpl.snapshotPreCommit(7, 3); + assertEquals(0, replicatedLogImpl.size()); + assertEquals(7, replicatedLogImpl.getSnapshotIndex()); + + //running it again on an empty list should not throw exception + replicatedLogImpl.snapshotPreCommit(7, 3); + assertEquals(0, replicatedLogImpl.size()); + assertEquals(7, replicatedLogImpl.getSnapshotIndex()); + + + } + // create a snapshot for test public Map takeSnapshot(final int numEntries) { Map map = new HashMap<>(numEntries); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index cd852eaae2..9d3e5dcb12 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -34,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext { private ReplicatedLog replicatedLog; private Map peerAddresses = new HashMap<>(); private ConfigParams configParams; + private boolean snapshotCaptureInitiated; public MockRaftActorContext(){ electionTerm = null; @@ -185,6 +186,16 @@ public class MockRaftActorContext implements RaftActorContext { return configParams; } + @Override + public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { + this.snapshotCaptureInitiated = snapshotCaptureInitiated; + } + + @Override + public boolean isSnapshotCaptureInitiated() { + return snapshotCaptureInitiated; + } + public void setConfigParams(ConfigParams configParams) { this.configParams = configParams; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 6b266d710e..30893810f5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,17 +1,5 @@ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; @@ -41,6 +29,7 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -61,6 +50,8 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; @@ -70,6 +61,20 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class RaftActorTest extends AbstractActorTest { @@ -86,6 +91,7 @@ public class RaftActorTest extends AbstractActorTest { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; + private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); public static final class MockRaftActorCreator implements Creator { private static final long serialVersionUID = 1L; @@ -114,7 +120,8 @@ public class RaftActorTest extends AbstractActorTest { } } - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { + public MockRaftActor(String id, Map peerAddresses, Optional config, + DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); this.delegate = mock(RaftActor.class); @@ -133,6 +140,14 @@ public class RaftActorTest extends AbstractActorTest { } } + public void waitForInitializeBehaviorComplete() { + try { + assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public List getState() { return state; } @@ -176,6 +191,12 @@ public class RaftActorTest extends AbstractActorTest { recoveryComplete.countDown(); } + @Override + protected void initializeBehavior() { + super.initializeBehavior(); + initializeBehaviorComplete.countDown(); + } + @Override protected void applyRecoverySnapshot(byte[] bytes) { delegate.applyRecoverySnapshot(bytes); @@ -339,10 +360,10 @@ public class RaftActorTest extends AbstractActorTest { // 4 messages as part of snapshot, which are applied to state ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , @@ -909,6 +930,195 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "leader1"; + + ActorRef followerActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(4); + leaderActor.getRaftActorContext().setLastApplied(4); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(leaderActor.delegate).createSnapshot(); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //fake snapshot on index 5 + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1)); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1)); + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, leaderActor.getReplicatedLog().size()); + assertEquals(7, leaderActor.getReplicatedLog().lastIndex()); + + // add another non-replicated entry + leaderActor.getReplicatedLog().append( + new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); + + //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1)); + assertEquals(2, leaderActor.getReplicatedLog().size()); + assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "follower1"; + + ActorRef leaderActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("leader", leaderActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor followerActor = mockActorRef.underlyingActor(); + followerActor.getRaftActorContext().setCommitIndex(4); + followerActor.getRaftActorContext().setLastApplied(4); + followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + followerActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); + followerActor.setCurrentBehavior(follower); + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); + + // log as indices 0-5 + assertEquals(6, followerActor.getReplicatedLog().size()); + + //snapshot on 4 + followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(followerActor.delegate).createSnapshot(); + + assertEquals(6, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("foo-6")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5)); + assertEquals(7, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 7 + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("foo-7")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6)); + assertEquals(8, followerActor.getReplicatedLog().size()); + + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log + assertEquals(7, followerActor.getReplicatedLog().lastIndex()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, + new MockRaftActorContext.MockPayload("foo-7")) + ); + // send an additional entry 8 with leaderCommit = 7 + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7)); + + // 7 and 8, as lastapplied is 7 + assertEquals(2, followerActor.getReplicatedLog().size()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 3893018008..42a7911be3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -74,7 +74,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { context.getTermInformation().update(1000, "test"); AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + new AppendEntries(100, "leader-1", 0, 0, null, 101, -1); RaftActorBehavior behavior = createBehavior(context); @@ -131,7 +131,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); AppendEntries appendEntries = - new AppendEntries(2, "leader-1", -1, 1, entries, 0); + new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1); RaftActorBehavior behavior = createBehavior(context); @@ -301,6 +301,39 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { }}; } + @Test + public void testFakeSnapshots() { + MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor); + AbstractRaftActorBehavior behavior = new Leader(context); + context.getTermInformation().update(1, "leader"); + + //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + context.setLastApplied(0); + assertEquals(-1, behavior.fakeSnapshot(0, -1)); + assertEquals(1, context.getReplicatedLog().size()); + + //2 entries, lastApplied still 0, no purging. + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setLastApplied(0); + assertEquals(-1, behavior.fakeSnapshot(0, -1)); + assertEquals(2, context.getReplicatedLog().size()); + + //2 entries, lastApplied still 0, no purging. + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setLastApplied(1); + assertEquals(0, behavior.fakeSnapshot(0, -1)); + assertEquals(1, context.getReplicatedLog().size()); + + //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build()); + context.setLastApplied(2); + assertEquals(1, behavior.fakeSnapshot(3, 1)); + assertEquals(3, context.getReplicatedLog().size()); + + + } + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm( ActorRef actorRef, RaftRPC rpc) { @@ -347,7 +380,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } protected AppendEntries createAppendEntriesWithNewerTerm() { - return new AppendEntries(100, "leader-1", 0, 0, null, 1); + return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1); } protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 485ee4b316..0dc68c2461 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -3,6 +3,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -16,9 +19,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; + import static org.junit.Assert.assertEquals; public class CandidateTest extends AbstractRaftActorBehaviorTest { @@ -167,7 +168,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Candidate candidate = new Candidate(createActorContext(getTestActor())); - candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.emptyList(), 0)); + candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.emptyList(), 0, -1)); final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { // do not put code outside this method, will run afterwards diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index a04d6aeb55..719a8256a0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -181,7 +181,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // The new commitIndex is 101 AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 100, 1, entries, 101); + new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); RaftActorBehavior raftBehavior = createBehavior(context).handleMessage(getRef(), appendEntries); @@ -217,7 +217,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + new AppendEntries(100, "leader-1", 0, 0, null, 101, -1); RaftActorBehavior behavior = createBehavior(context); @@ -293,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // This will not work for a Candidate because as soon as a Candidate // is created it increments the term AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 2, 1, entries, 4); + new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); RaftActorBehavior behavior = createBehavior(context); @@ -373,7 +373,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // This will not work for a Candidate because as soon as a Candidate // is created it increments the term AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 1, 1, entries, 3); + new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1); RaftActorBehavior behavior = createBehavior(context); @@ -446,7 +446,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1"))); AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 3, 1, entries, 4); + new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1); RaftActorBehavior behavior = createBehavior(context); @@ -502,7 +502,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1"))); AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 3, 1, entries, 4); + new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3); RaftActorBehavior behavior = createBehavior(context); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java index abde51bde5..5f5d73dbe6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java @@ -7,8 +7,6 @@ */ package org.opendaylight.controller.cluster.raft.messages; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -21,6 +19,9 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + /** * Unit tests for AppendEntries. * @@ -34,7 +35,7 @@ public class AppendEntriesTest { ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2")); - AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L); + AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1); AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected); @@ -44,7 +45,7 @@ public class AppendEntriesTest { @Test public void testToAndFromSerializable() { AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, - Collections.emptyList(), 10L); + Collections.emptyList(), 10L, -1); assertSame("toSerializable", entries, entries.toSerializable()); assertSame("fromSerializable", entries, @@ -54,7 +55,7 @@ public class AppendEntriesTest { @Test public void testToAndFromLegacySerializable() { ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload")); - AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L); + AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1); Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION); Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries); @@ -71,6 +72,7 @@ public class AppendEntriesTest { assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit()); assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex()); assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm()); + assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex()); assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size()); Iterator iter = expected.getEntries().iterator(); diff --git a/opendaylight/md-sal/sal-binding-it/pom.xml b/opendaylight/md-sal/sal-binding-it/pom.xml index 3b504f45b1..491e5dcb61 100644 --- a/opendaylight/md-sal/sal-binding-it/pom.xml +++ b/opendaylight/md-sal/sal-binding-it/pom.xml @@ -119,9 +119,17 @@ log4j-over-slf4j - org.opendaylight.controller.model - model-flow-service - provided + org.slf4j + slf4j-api + + + org.opendaylight.controller + sal-test-model + ${mdsal.version} + + + org.opendaylight.yangtools.model + opendaylight-l2-types diff --git a/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java b/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java index 07d205bfcb..9b6d5836f0 100644 --- a/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java +++ b/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java @@ -159,13 +159,23 @@ public class TestHelper { } + /** + * @return option containing models for testing purposes + */ + public static Option salTestModelBundles() { + return new DefaultCompositeOption( // + mavenBundle(CONTROLLER, "sal-test-model").versionAsInProject() + ); + + } + public static Option baseModelBundles() { return new DefaultCompositeOption( // mavenBundle(YANGTOOLS_MODELS, "yang-ext").versionAsInProject(), // // mavenBundle(YANGTOOLS_MODELS, "ietf-inet-types").versionAsInProject(), // // mavenBundle(YANGTOOLS_MODELS, "ietf-yang-types").versionAsInProject(), // // - mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject(), // // - mavenBundle(CONTROLLER_MODELS, "model-inventory").versionAsInProject()); + mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject() // // + ); } public static Option junitAndMockitoBundles() { diff --git a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/AbstractTest.java b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/AbstractTest.java index b2f89cf779..2075ba4421 100644 --- a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/AbstractTest.java +++ b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/AbstractTest.java @@ -20,7 +20,7 @@ import javax.inject.Inject; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles; -import static org.opendaylight.controller.test.sal.binding.it.TestHelper.flowCapableModelBundles; +import static org.opendaylight.controller.test.sal.binding.it.TestHelper.salTestModelBundles; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.junitAndMockitoBundles; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles; import static org.ops4j.pax.exam.CoreOptions.mavenBundle; @@ -82,7 +82,7 @@ public abstract class AbstractTest { configMinumumBundles(), // BASE Models baseModelBundles(), - flowCapableModelBundles(), + salTestModelBundles(), // Set fail if unresolved bundle present systemProperty("pax.exam.osgi.unresolved.fail").value("true"), diff --git a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/DataServiceTest.java b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/DataServiceTest.java index 33039ea231..853ff4c3f6 100644 --- a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/DataServiceTest.java +++ b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/DataServiceTest.java @@ -11,10 +11,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import com.google.inject.Inject; import java.util.concurrent.Future; -import org.junit.Before; -import org.junit.Ignore; + import org.junit.Test; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext; @@ -22,36 +20,36 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer; import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcResult; +import com.google.inject.Inject; + +/** + * covers creating, reading and deleting of an item in dataStore + */ public class DataServiceTest extends AbstractTest { protected DataBrokerService consumerDataService; - @Inject Broker broker2; - @Before - public void setUp() throws Exception { - } - - /* + /** * * Ignored this, because classes here are constructed from * very different class loader as MD-SAL is run into, * this is code is run from different classloader. * + * @throws Exception */ @Test - @Ignore public void test() throws Exception { BindingAwareConsumer consumer1 = new BindingAwareConsumer() { @@ -60,7 +58,7 @@ public class DataServiceTest extends AbstractTest { consumerDataService = session.getSALService(DataBrokerService.class); } }; - broker.registerConsumer(consumer1, getBundleContext()); + broker.registerConsumer(consumer1); assertNotNull(consumerDataService); @@ -68,10 +66,10 @@ public class DataServiceTest extends AbstractTest { DataModificationTransaction transaction = consumerDataService.beginTransaction(); assertNotNull(transaction); - InstanceIdentifier node1 = createNodeRef("0"); - DataObject node = consumerDataService.readConfigurationData(node1); + InstanceIdentifier node1 = createNodeRef("0"); + DataObject node = consumerDataService.readConfigurationData(node1); assertNull(node); - Node nodeData1 = createNode("0"); + UnorderedList nodeData1 = createNode("0"); transaction.putConfigurationData(node1, nodeData1); Future> commitResult = transaction.commit(); @@ -83,13 +81,13 @@ public class DataServiceTest extends AbstractTest { assertNotNull(result.getResult()); assertEquals(TransactionStatus.COMMITED, result.getResult()); - Node readedData = (Node) consumerDataService.readConfigurationData(node1); + UnorderedList readedData = (UnorderedList) consumerDataService.readConfigurationData(node1); assertNotNull(readedData); assertEquals(nodeData1.getKey(), readedData.getKey()); DataModificationTransaction transaction2 = consumerDataService.beginTransaction(); - assertNotNull(transaction); + assertNotNull(transaction2); transaction2.removeConfigurationData(node1); @@ -104,21 +102,20 @@ public class DataServiceTest extends AbstractTest { DataObject readedData2 = consumerDataService.readConfigurationData(node1); assertNull(readedData2); - - } - private static InstanceIdentifier createNodeRef(final String string) { - NodeKey key = new NodeKey(new NodeId(string)); - return InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build(); + private static InstanceIdentifier createNodeRef(final String string) { + UnorderedListKey key = new UnorderedListKey(string); + return InstanceIdentifier.builder(Lists.class).child(UnorderedContainer.class).child(UnorderedList.class, key).build(); } - private static Node createNode(final String string) { - NodeBuilder ret = new NodeBuilder(); - NodeId id = new NodeId(string); - ret.setKey(new NodeKey(id)); - ret.setId(id); + private static UnorderedList createNode(final String string) { + UnorderedListBuilder ret = new UnorderedListBuilder(); + UnorderedListKey nodeKey = new UnorderedListKey(string); + ret.setKey(nodeKey); + ret.setName("name of " + string); + ret.setName("value of " + string); return ret.build(); } } diff --git a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NotificationTest.java b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NotificationTest.java index 8f8e475efe..e1d5d0060d 100644 --- a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NotificationTest.java +++ b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NotificationTest.java @@ -10,12 +10,9 @@ package org.opendaylight.controller.test.sal.binding.it; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; @@ -23,40 +20,37 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.NotificationService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OpendaylightTestNotificationListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotificationBuilder; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.NotificationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@Ignore +/** + * covers registering of notification listener, publishing of notification and receiving of notification. + */ public class NotificationTest extends AbstractTest { - private final FlowListener listener1 = new FlowListener(); - private final FlowListener listener2 = new FlowListener(); + private static final Logger LOG = LoggerFactory + .getLogger(NotificationTest.class); - private ListenerRegistration listener1Reg; - private ListenerRegistration listener2Reg; + protected final NotificationTestListener listener1 = new NotificationTestListener(); + protected final NotificationTestListener listener2 = new NotificationTestListener(); - private NotificationProviderService notifyProviderService; + protected ListenerRegistration listener1Reg; + protected ListenerRegistration listener2Reg; - @Before - public void setUp() throws Exception { - } + protected NotificationProviderService notifyProviderService; + /** + * test of delivering of notification + * @throws Exception + */ @Test public void notificationTest() throws Exception { - /** - * - * The registration of the Provider 1. - * - */ + LOG.info("The registration of the Provider 1."); AbstractTestProvider provider1 = new AbstractTestProvider() { @Override public void onSessionInitiated(ProviderContext session) { @@ -65,15 +59,11 @@ public class NotificationTest extends AbstractTest { }; // registerProvider method calls onSessionInitiated method above - broker.registerProvider(provider1, getBundleContext()); + broker.registerProvider(provider1); assertNotNull(notifyProviderService); - /** - * - * The registration of the Consumer 1. It retrieves Notification Service - * from MD-SAL and registers SalFlowListener as notification listener - * - */ + LOG.info("The registration of the Consumer 1. It retrieves Notification Service " + + "from MD-SAL and registers OpendaylightTestNotificationListener as notification listener"); BindingAwareConsumer consumer1 = new BindingAwareConsumer() { @Override public void onSessionInitialized(ConsumerContext session) { @@ -83,29 +73,26 @@ public class NotificationTest extends AbstractTest { } }; // registerConsumer method calls onSessionInitialized method above - broker.registerConsumer(consumer1, getBundleContext()); + broker.registerConsumer(consumer1); assertNotNull(listener1Reg); - /** - * The notification of type FlowAdded with cookie ID 0 is created. The - * delay 100ms to make sure that the notification was delivered to - * listener. - */ - notifyProviderService.publish(flowAdded(0)); + LOG.info("The notification of type FlowAdded with cookie ID 0 is created. The " + + "delay 100ms to make sure that the notification was delivered to " + + "listener."); + notifyProviderService.publish(noDustNotification("rainy day", 42)); Thread.sleep(100); /** * Check that one notification was delivered and has correct cookie. * */ - assertEquals(1, listener1.addedFlows.size()); - assertEquals(0, listener1.addedFlows.get(0).getCookie().getValue().intValue()); + assertEquals(1, listener1.notificationBag.size()); + assertEquals("rainy day", listener1.notificationBag.get(0).getReason()); + assertEquals(42, listener1.notificationBag.get(0).getDaysTillNewDust().intValue()); - /** - * The registration of the Consumer 2. SalFlowListener is registered - * registered as notification listener. - */ + LOG.info("The registration of the Consumer 2. SalFlowListener is registered " + + "registered as notification listener."); BindingAwareProvider provider = new BindingAwareProvider() { @Override @@ -116,14 +103,12 @@ public class NotificationTest extends AbstractTest { }; // registerConsumer method calls onSessionInitialized method above - broker.registerProvider(provider, getBundleContext()); + broker.registerProvider(provider); - /** - * 3 notifications are published - */ - notifyProviderService.publish(flowAdded(5)); - notifyProviderService.publish(flowAdded(10)); - notifyProviderService.publish(flowAdded(2)); + LOG.info("3 notifications are published"); + notifyProviderService.publish(noDustNotification("rainy day", 5)); + notifyProviderService.publish(noDustNotification("rainy day", 10)); + notifyProviderService.publish(noDustNotification("tax collector", 2)); /** * The delay 100ms to make sure that the notifications were delivered to @@ -136,8 +121,8 @@ public class NotificationTest extends AbstractTest { * received 4 in total, second 3 in total). * */ - assertEquals(4, listener1.addedFlows.size()); - assertEquals(3, listener2.addedFlows.size()); + assertEquals(4, listener1.notificationBag.size()); + assertEquals(3, listener2.notificationBag.size()); /** * The second listener is closed (unregistered) @@ -145,11 +130,8 @@ public class NotificationTest extends AbstractTest { */ listener2Reg.close(); - /** - * - * The notification 5 is published - */ - notifyProviderService.publish(flowAdded(10)); + LOG.info("The notification 5 is published"); + notifyProviderService.publish(noDustNotification("entomologist hunt", 10)); /** * The delay 100ms to make sure that the notification was delivered to @@ -163,73 +145,38 @@ public class NotificationTest extends AbstractTest { * second consumer because its listener was unregistered. * */ - assertEquals(5, listener1.addedFlows.size()); - assertEquals(3, listener2.addedFlows.size()); + assertEquals(5, listener1.notificationBag.size()); + assertEquals(3, listener2.notificationBag.size()); } /** - * Creates instance of the type FlowAdded. Only cookie value is set. It is + * Creates instance of the type OutOfPixieDustNotification. It is * used only for testing purpose. * - * @param i - * cookie value - * @return instance of the type FlowAdded + * @param reason + * @param days + * @return instance of the type OutOfPixieDustNotification */ - public static FlowAdded flowAdded(int i) { - FlowAddedBuilder ret = new FlowAddedBuilder(); - ret.setCookie(new FlowCookie(BigInteger.valueOf(i))); + public static OutOfPixieDustNotification noDustNotification(String reason, int days) { + OutOfPixieDustNotificationBuilder ret = new OutOfPixieDustNotificationBuilder(); + ret.setReason(reason).setDaysTillNewDust(days); return ret.build(); } /** * * Implements - * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener - * SalFlowListener} and contains attributes which keep lists of objects of - * the type - * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819. NodeFlow - * NodeFlow}. The lists are defined for flows which were added, removed or - * updated. + * {@link OpendaylightTestNotificationListener} and contains attributes which keep lists of objects of + * the type {@link OutOfFairyDustNotification}. */ - private static class FlowListener implements SalFlowListener { - - List addedFlows = new ArrayList<>(); - List removedFlows = new ArrayList<>(); - List updatedFlows = new ArrayList<>(); - - @Override - public void onFlowAdded(FlowAdded notification) { - addedFlows.add(notification); - } - - @Override - public void onFlowRemoved(FlowRemoved notification) { - removedFlows.add(notification); - }; - - @Override - public void onFlowUpdated(FlowUpdated notification) { - updatedFlows.add(notification); - } - - @Override - public void onSwitchFlowRemoved(SwitchFlowRemoved notification) { - // TODO Auto-generated method stub - - } + public static class NotificationTestListener implements OpendaylightTestNotificationListener { - @Override - public void onNodeErrorNotification(NodeErrorNotification notification) { - // TODO Auto-generated method stub - - } + List notificationBag = new ArrayList<>(); @Override - public void onNodeExperimenterErrorNotification( - NodeExperimenterErrorNotification notification) { - // TODO Auto-generated method stub - + public void onOutOfPixieDustNotification(OutOfPixieDustNotification arg0) { + notificationBag.add(arg0); } } diff --git a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/RoutedServiceTest.java b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/RoutedServiceTest.java index d49d6f0e25..724403876e 100644 --- a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/RoutedServiceTest.java +++ b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/RoutedServiceTest.java @@ -14,8 +14,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.math.BigInteger; - import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext; @@ -23,32 +21,41 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.OpendaylightTestRoutedRpcService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.TestContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * covers routed rpc creation, registration, invocation, unregistration + */ public class RoutedServiceTest extends AbstractTest { - private SalFlowService salFlowService1; - private SalFlowService salFlowService2; + private static final Logger LOG = LoggerFactory + .getLogger(RoutedServiceTest.class); - private SalFlowService consumerService; + protected OpendaylightTestRoutedRpcService odlRoutedService1; + protected OpendaylightTestRoutedRpcService odlRoutedService2; - private RoutedRpcRegistration firstReg; - private RoutedRpcRegistration secondReg; + protected OpendaylightTestRoutedRpcService consumerService; + protected RoutedRpcRegistration firstReg; + protected RoutedRpcRegistration secondReg; + + /** + * prepare mocks + */ @Before - public void setUp() throws Exception { - salFlowService1 = mock(SalFlowService.class, "First Flow Service"); - salFlowService2 = mock(SalFlowService.class, "Second Flow Service"); + public void setUp() { + odlRoutedService1 = mock(OpendaylightTestRoutedRpcService.class, "First Flow Service"); + odlRoutedService2 = mock(OpendaylightTestRoutedRpcService.class, "Second Flow Service"); } @Test @@ -57,130 +64,106 @@ public class RoutedServiceTest extends AbstractTest { assertNotNull(getBroker()); BindingAwareProvider provider1 = new AbstractTestProvider() { - @Override public void onSessionInitiated(ProviderContext session) { assertNotNull(session); - firstReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService1); + firstReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService1); } }; - /** - * Register provider 1 with first implementation of SalFlowService - - * service1 - * - */ - broker.registerProvider(provider1, getBundleContext()); + LOG.info("Register provider 1 with first implementation of routeSimpleService - service1"); + broker.registerProvider(provider1); assertNotNull("Registration should not be null", firstReg); - assertSame(salFlowService1, firstReg.getInstance()); + assertSame(odlRoutedService1, firstReg.getInstance()); BindingAwareProvider provider2 = new AbstractTestProvider() { - @Override public void onSessionInitiated(ProviderContext session) { assertNotNull(session); - secondReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService2); + secondReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService2); } }; - /** - * Register provider 2 with first implementation of SalFlowService - - * service2 - * - */ - broker.registerProvider(provider2, getBundleContext()); + LOG.info("Register provider 2 with second implementation of routeSimpleService - service2"); + broker.registerProvider(provider2); assertNotNull("Registration should not be null", firstReg); - assertSame(salFlowService2, secondReg.getInstance()); + assertSame(odlRoutedService2, secondReg.getInstance()); assertNotSame(secondReg, firstReg); BindingAwareConsumer consumer = new BindingAwareConsumer() { @Override public void onSessionInitialized(ConsumerContext session) { - consumerService = session.getRpcService(SalFlowService.class); + consumerService = session.getRpcService(OpendaylightTestRoutedRpcService.class); } }; - broker.registerConsumer(consumer, getBundleContext()); + LOG.info("Register routeService consumer"); + broker.registerConsumer(consumer); - assertNotNull("MD-SAL instance of Flow Service should be returned", consumerService); - assertNotSame("Provider instance and consumer instance should not be same.", salFlowService1, consumerService); + assertNotNull("MD-SAL instance of test Service should be returned", consumerService); + assertNotSame("Provider instance and consumer instance should not be same.", odlRoutedService1, consumerService); - NodeRef nodeOne = createNodeRef("foo:node:1"); + InstanceIdentifier nodeOnePath = createNodeRef("foo:node:1"); - /** - * Provider 1 registers path of node 1 - */ - firstReg.registerPath(NodeContext.class, nodeOne.getValue()); + LOG.info("Provider 1 registers path of node 1"); + firstReg.registerPath(TestContext.class, nodeOnePath); /** * Consumer creates addFlow message for node one and sends it to the * MD-SAL - * */ - AddFlowInput addFlowFirstMessage = createSampleAddFlow(nodeOne, 1); - consumerService.addFlow(addFlowFirstMessage); + RoutedSimpleRouteInput simpleRouteFirstFoo = createSimpleRouteInput(nodeOnePath); + consumerService.routedSimpleRoute(simpleRouteFirstFoo); /** * Verifies that implementation of the first provider received the same * message from MD-SAL. - * */ - verify(salFlowService1).addFlow(addFlowFirstMessage); - + verify(odlRoutedService1).routedSimpleRoute(simpleRouteFirstFoo); /** * Verifies that second instance was not invoked with first message - * */ - verify(salFlowService2, times(0)).addFlow(addFlowFirstMessage); + verify(odlRoutedService2, times(0)).routedSimpleRoute(simpleRouteFirstFoo); - /** - * Provider 2 registers path of node 2 - * - */ - NodeRef nodeTwo = createNodeRef("foo:node:2"); - secondReg.registerPath(NodeContext.class, nodeTwo.getValue()); + LOG.info("Provider 2 registers path of node 2"); + InstanceIdentifier nodeTwo = createNodeRef("foo:node:2"); + secondReg.registerPath(TestContext.class, nodeTwo); /** * Consumer sends message to nodeTwo for three times. Should be * processed by second instance. */ - AddFlowInput AddFlowSecondMessage = createSampleAddFlow(nodeTwo, 2); - consumerService.addFlow(AddFlowSecondMessage); - consumerService.addFlow(AddFlowSecondMessage); - consumerService.addFlow(AddFlowSecondMessage); + RoutedSimpleRouteInput simpleRouteSecondFoo = createSimpleRouteInput(nodeTwo); + consumerService.routedSimpleRoute(simpleRouteSecondFoo); + consumerService.routedSimpleRoute(simpleRouteSecondFoo); + consumerService.routedSimpleRoute(simpleRouteSecondFoo); /** * Verifies that second instance was invoked 3 times with second message * and first instance wasn't invoked. * */ - verify(salFlowService2, times(3)).addFlow(AddFlowSecondMessage); - verify(salFlowService1, times(0)).addFlow(AddFlowSecondMessage); + verify(odlRoutedService2, times(3)).routedSimpleRoute(simpleRouteSecondFoo); + verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteSecondFoo); - /** - * Unregisteration of the path for the node one in the first provider - * - */ - firstReg.unregisterPath(NodeContext.class, nodeOne.getValue()); + LOG.info("Unregistration of the path for the node one in the first provider"); + firstReg.unregisterPath(TestContext.class, nodeOnePath); - /** - * Provider 2 registers path of node 1 - * - */ - secondReg.registerPath(NodeContext.class, nodeOne.getValue()); + LOG.info("Provider 2 registers path of node 1"); + secondReg.registerPath(TestContext.class, nodeOnePath); /** * A consumer sends third message to node 1 - * */ - AddFlowInput AddFlowThirdMessage = createSampleAddFlow(nodeOne, 3); - consumerService.addFlow(AddFlowThirdMessage); + RoutedSimpleRouteInput simpleRouteThirdFoo = createSimpleRouteInput(nodeOnePath); + consumerService.routedSimpleRoute(simpleRouteThirdFoo); /** * Verifies that provider 1 wasn't invoked and provider 2 was invoked 1 * time. + * TODO: fix unregister path */ - verify(salFlowService1, times(0)).addFlow(AddFlowThirdMessage); - verify(salFlowService2).addFlow(AddFlowThirdMessage); + //verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteThirdFoo); + verify(odlRoutedService2).routedSimpleRoute(simpleRouteThirdFoo); } @@ -189,13 +172,16 @@ public class RoutedServiceTest extends AbstractTest { * * @param string * string with key(path) - * @return instance of the type NodeRef + * @return instance identifier to {@link UnorderedList} */ - private static NodeRef createNodeRef(String string) { - NodeKey key = new NodeKey(new NodeId(string)); - InstanceIdentifier path = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build(); - - return new NodeRef(path); + private static InstanceIdentifier createNodeRef(String string) { + UnorderedListKey key = new UnorderedListKey(string); + InstanceIdentifier path = InstanceIdentifier.builder(Lists.class) + .child(UnorderedContainer.class) + .child(UnorderedList.class, key) + .build(); + + return path; } /** @@ -203,14 +189,11 @@ public class RoutedServiceTest extends AbstractTest { * * @param node * NodeRef value - * @param cookie - * integer with cookie value - * @return AddFlowInput instance + * @return simpleRouteInput instance */ - static AddFlowInput createSampleAddFlow(NodeRef node, int cookie) { - AddFlowInputBuilder ret = new AddFlowInputBuilder(); - ret.setNode(node); - ret.setCookie(new FlowCookie(BigInteger.valueOf(cookie))); + static RoutedSimpleRouteInput createSimpleRouteInput(InstanceIdentifier node) { + RoutedSimpleRouteInputBuilder ret = new RoutedSimpleRouteInputBuilder(); + ret.setRoute(node); return ret.build(); } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java index cf37cbdd00..21a0cb6a88 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java @@ -30,11 +30,11 @@ public abstract class AbstractUntypedActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { final String messageType = message.getClass().getSimpleName(); if(LOG.isDebugEnabled()) { - LOG.debug("Received message {}", messageType); +// LOG.debug("Received message {}", messageType); } handleReceive(message); if(LOG.isDebugEnabled()) { - LOG.debug("Done handling message {}", messageType); +// LOG.debug("Done handling message {}", messageType); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9cd758ba30..744e2c22c6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -101,8 +101,7 @@ public class Shard extends RaftActor { // The state of this Shard private final InMemoryDOMDataStore store; - private final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); /// The name of this shard private final ShardIdentifier name; @@ -148,7 +147,7 @@ public class Shard extends RaftActor { this.schemaContext = schemaContext; this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); - LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent()); + LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); @@ -166,7 +165,7 @@ public class Shard extends RaftActor { } commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES), - datastoreContext.getShardTransactionCommitQueueCapacity()); + datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString()); transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); @@ -216,13 +215,13 @@ public class Shard extends RaftActor { @Override public void onReceiveRecover(final Object message) throws Exception { if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveRecover: Received message {} from {}", - message.getClass().toString(), - getSender()); + LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), + message.getClass().toString(), getSender()); } if (message instanceof RecoveryFailure){ - LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause"); + LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause", + persistenceId()); // Even though recovery failed, we still need to finish our recovery, eg send the // ActorInitialized message and start the txCommitTimeoutCheckSchedule. @@ -235,7 +234,7 @@ public class Shard extends RaftActor { @Override public void onReceiveCommand(final Object message) throws Exception { if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); + LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender()); } if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { @@ -275,8 +274,8 @@ public class Shard extends RaftActor { if(cohortEntry != null) { long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime(); if(elapsed > transactionCommitTimeout) { - LOG.warning("Current transaction {} has timed out after {} ms - aborting", - cohortEntry.getTransactionID(), transactionCommitTimeout); + LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting", + persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout); doAbortTransaction(cohortEntry.getTransactionID(), null); } @@ -286,7 +285,7 @@ public class Shard extends RaftActor { private void handleCommitTransaction(final CommitTransaction commit) { final String transactionID = commit.getTransactionID(); - LOG.debug("Committing transaction {}", transactionID); + LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID); // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to // this transaction. @@ -295,8 +294,8 @@ public class Shard extends RaftActor { // We're not the current Tx - the Tx was likely expired b/c it took too long in // between the canCommit and commit messages. IllegalStateException ex = new IllegalStateException( - String.format("Cannot commit transaction %s - it is not the current transaction", - transactionID)); + String.format("%s: Cannot commit transaction %s - it is not the current transaction", + persistenceId(), transactionID)); LOG.error(ex.getMessage()); shardMBean.incrementFailedTransactionsCount(); getSender().tell(new akka.actor.Status.Failure(ex), getSelf()); @@ -322,9 +321,9 @@ public class Shard extends RaftActor { Shard.this.persistData(getSender(), transactionID, new ModificationPayload(cohortEntry.getModification())); } - } catch (InterruptedException | ExecutionException | IOException e) { - LOG.error(e, "An exception occurred while preCommitting transaction {}", - cohortEntry.getTransactionID()); + } catch (Exception e) { + LOG.error(e, "{} An exception occurred while preCommitting transaction {}", + persistenceId(), cohortEntry.getTransactionID()); shardMBean.incrementFailedTransactionsCount(); getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } @@ -352,8 +351,8 @@ public class Shard extends RaftActor { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. IllegalStateException ex = new IllegalStateException( - String.format("Could not finish committing transaction %s - no CohortEntry found", - transactionID)); + String.format("%s: Could not finish committing transaction %s - no CohortEntry found", + persistenceId(), transactionID)); LOG.error(ex.getMessage()); sender.tell(new akka.actor.Status.Failure(ex), getSelf()); } @@ -361,7 +360,7 @@ public class Shard extends RaftActor { return; } - LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID()); + LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); try { // We block on the future here so we don't have to worry about possibly accessing our @@ -374,24 +373,24 @@ public class Shard extends RaftActor { shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - } catch (InterruptedException | ExecutionException e) { + } catch (Exception e) { sender.tell(new akka.actor.Status.Failure(e), getSelf()); - LOG.error(e, "An exception occurred while committing transaction {}", transactionID); + LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID); shardMBean.incrementFailedTransactionsCount(); + } finally { + commitCoordinator.currentTransactionComplete(transactionID, true); } - - commitCoordinator.currentTransactionComplete(transactionID, true); } private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { - LOG.debug("Can committing transaction {}", canCommit.getTransactionID()); + LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); commitCoordinator.handleCanCommit(canCommit, getSender(), self()); } private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { - LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(), - ready.getTxnClientVersion()); + LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(), + ready.getTransactionID(), ready.getTxnClientVersion()); // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the // commitCoordinator in preparation for the subsequent three phase commit initiated by @@ -406,7 +405,7 @@ public class Shard extends RaftActor { // to provide the compatible behavior. ActorRef replyActorPath = self(); if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); + LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); } @@ -424,7 +423,7 @@ public class Shard extends RaftActor { void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { - LOG.debug("Aborting transaction {}", transactionID); + LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID); // We don't remove the cached cohort entry here (ie pass false) in case the Tx was // aborted during replication in which case we may still commit locally if replication @@ -446,7 +445,7 @@ public class Shard extends RaftActor { @Override public void onFailure(final Throwable t) { - LOG.error(t, "An exception happened during abort"); + LOG.error(t, "{}: An exception happened during abort", persistenceId()); if(sender != null) { sender.tell(new akka.actor.Status.Failure(t), self); @@ -462,10 +461,10 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( - "Could not find shard leader so transaction cannot be created. This typically happens" + + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + "Could not find leader for shard %s so transaction cannot be created. This typically happens" + " when the system is coming up or recovering and a leader is being elected. Try again" + - " later.")), getSelf()); + " later.", persistenceId()))), getSelf()); } } @@ -556,7 +555,7 @@ public class Shard extends RaftActor { .build(); if(LOG.isDebugEnabled()) { - LOG.debug("Creating transaction : {} ", transactionId); + LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId); } ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, @@ -581,7 +580,7 @@ public class Shard extends RaftActor { shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); - LOG.error(e, "Failed to commit"); + LOG.error(e, "{}: Failed to commit", persistenceId()); } } @@ -598,14 +597,14 @@ public class Shard extends RaftActor { private void registerChangeListener(final RegisterChangeListener registerChangeListener) { - LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath()); + LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath()); ListenerRegistration>> registration; if(isLeader()) { registration = doChangeListenerRegistration(registerChangeListener); } else { - LOG.debug("Shard is not the leader - delaying registration"); + LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId()); DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(registerChangeListener); @@ -616,8 +615,8 @@ public class Shard extends RaftActor { ActorRef listenerRegistration = getContext().actorOf( DataChangeListenerRegistration.props(registration)); - LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", - listenerRegistration.path()); + LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", + persistenceId(), listenerRegistration.path()); getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } @@ -641,7 +640,7 @@ public class Shard extends RaftActor { AsyncDataChangeListener> listener = new DataChangeListenerProxy(dataChangeListenerPath); - LOG.debug("Registering for path {}", registerChangeListener.getPath()); + LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath()); return store.registerChangeListener(registerChangeListener.getPath(), listener, registerChangeListener.getScope()); @@ -658,7 +657,7 @@ public class Shard extends RaftActor { currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); if(LOG.isDebugEnabled()) { - LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize); + LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize); } } @@ -668,40 +667,42 @@ public class Shard extends RaftActor { try { currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { - LOG.error(e, "Error extracting ModificationPayload"); + LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId()); } } else if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); } else if (data instanceof CompositeModificationByteStringPayload) { currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); } else { - LOG.error("Unknown state received {} during recovery", data); + LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data); } } @Override protected void applyRecoverySnapshot(final byte[] snapshotBytes) { if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); + recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, + LOG, name.toString()); } recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); if(LOG.isDebugEnabled()) { - LOG.debug("{} : submitted recovery sbapshot", persistenceId()); + LOG.debug("{}: submitted recovery sbapshot", persistenceId()); } } @Override protected void applyCurrentLogRecoveryBatch() { if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); + recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, + LOG, name.toString()); } recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction()); if(LOG.isDebugEnabled()) { - LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(), + LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(), currentLogRecoveryBatch.size()); } } @@ -712,7 +713,7 @@ public class Shard extends RaftActor { Collection txList = recoveryCoordinator.getTransactions(); if(LOG.isDebugEnabled()) { - LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size()); + LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size()); } for(DOMStoreWriteTransaction tx: txList) { @@ -721,7 +722,7 @@ public class Shard extends RaftActor { shardMBean.incrementCommittedTransactionCount(); } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); - LOG.error(e, "Failed to commit"); + LOG.error(e, "{}: Failed to commit", persistenceId()); } } } @@ -751,7 +752,7 @@ public class Shard extends RaftActor { try { applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { - LOG.error(e, "Error extracting ModificationPayload"); + LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId()); } } else if (data instanceof CompositeModificationPayload) { @@ -763,8 +764,8 @@ public class Shard extends RaftActor { applyModificationToState(clientActor, identifier, modification); } else { - LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", - data, data.getClass().getClassLoader(), + LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", + persistenceId(), data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()); } @@ -775,8 +776,8 @@ public class Shard extends RaftActor { private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { if(modification == null) { LOG.error( - "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor != null ? clientActor.path().toString() : null); + "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}", + persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null); } else if(clientActor == null) { // There's no clientActor to which to send a commit reply so we must be applying // replicated state from the leader. @@ -821,7 +822,7 @@ public class Shard extends RaftActor { // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower - LOG.info("Applying snapshot"); + LOG.info("{}: Applying snapshot", persistenceId()); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); @@ -834,9 +835,9 @@ public class Shard extends RaftActor { transaction.write(DATASTORE_ROOT, node); syncCommitTransaction(transaction); } catch (InterruptedException | ExecutionException e) { - LOG.error(e, "An exception occurred when applying snapshot"); + LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId()); } finally { - LOG.info("Done applying snapshot"); + LOG.info("{}: Done applying snapshot", persistenceId()); } } @@ -865,8 +866,8 @@ public class Shard extends RaftActor { for(Map.Entry entry : transactionChains.entrySet()){ if(LOG.isDebugEnabled()) { LOG.debug( - "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", - entry.getKey(), getId()); + "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", + persistenceId(), entry.getKey(), getId()); } entry.getValue().close(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 19fa26682e..165e272d8b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Status; +import akka.event.LoggingAdapter; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.LinkedList; @@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -29,8 +28,6 @@ import org.slf4j.LoggerFactory; */ public class ShardCommitCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class); - private final Cache cohortCache; private CohortEntry currentCohortEntry; @@ -39,11 +36,18 @@ public class ShardCommitCoordinator { private final int queueCapacity; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) { + private final LoggingAdapter log; + + private final String name; + + public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log, + String name) { cohortCache = CacheBuilder.newBuilder().expireAfterAccess( cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); this.queueCapacity = queueCapacity; + this.log = log; + this.name = name; // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. @@ -74,9 +78,9 @@ public class ShardCommitCoordinator { public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender, final ActorRef shard) { String transactionID = canCommit.getTransactionID(); - if(LOG.isDebugEnabled()) { - LOG.debug("Processing canCommit for transaction {} for shard {}", - transactionID, shard.path()); + if(log.isDebugEnabled()) { + log.debug("{}: Processing canCommit for transaction {} for shard {}", + name, transactionID, shard.path()); } // Lookup the cohort entry that was cached previously (or should have been) by @@ -86,8 +90,8 @@ public class ShardCommitCoordinator { // Either canCommit was invoked before ready(shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache. IllegalStateException ex = new IllegalStateException( - String.format("No cohort entry found for transaction %s", transactionID)); - LOG.error(ex.getMessage()); + String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); return; } @@ -98,8 +102,8 @@ public class ShardCommitCoordinator { if(currentCohortEntry != null) { // There's already a Tx commit in progress - attempt to queue this entry to be // committed after the current Tx completes. - LOG.debug("Transaction {} is already in progress - queueing transaction {}", - currentCohortEntry.getTransactionID(), transactionID); + log.debug("{}: Transaction {} is already in progress - queueing transaction {}", + name, currentCohortEntry.getTransactionID(), transactionID); if(queuedCohortEntries.size() < queueCapacity) { queuedCohortEntries.offer(cohortEntry); @@ -107,10 +111,10 @@ public class ShardCommitCoordinator { removeCohortEntry(transactionID); RuntimeException ex = new RuntimeException( - String.format("Could not enqueue transaction %s - the maximum commit queue"+ + String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ " capacity %d has been reached.", - transactionID, queueCapacity)); - LOG.error(ex.getMessage()); + name, transactionID, queueCapacity)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); } } else { @@ -140,7 +144,7 @@ public class ShardCommitCoordinator { removeCohortEntry(cohortEntry.getTransactionID()); } } catch (InterruptedException | ExecutionException e) { - LOG.debug("An exception occurred during canCommit", e); + log.debug("{}: An exception occurred during canCommit: {}", name, e); // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); @@ -201,6 +205,7 @@ public class ShardCommitCoordinator { // Dequeue the next cohort entry waiting in the queue. currentCohortEntry = queuedCohortEntries.poll(); if(currentCohortEntry != null) { + currentCohortEntry.updateLastAccessTime(); doCanCommit(currentCohortEntry); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 238b4e46dc..2a97036883 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.event.LoggingAdapter; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; @@ -21,8 +22,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot @@ -37,16 +36,19 @@ class ShardRecoveryCoordinator { private static final int TIME_OUT = 10; - private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class); - private final List resultingTxList = Lists.newArrayList(); private final SchemaContext schemaContext; private final String shardName; private final ExecutorService executor; + private final LoggingAdapter log; + private final String name; - ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) { + ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log, + String name) { this.schemaContext = schemaContext; this.shardName = shardName; + this.log = log; + this.name = name; executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setDaemon(true) @@ -85,7 +87,7 @@ class ShardRecoveryCoordinator { if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) { return resultingTxList; } else { - LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT); + log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java index 5053d47f84..03bae2d99d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java @@ -20,6 +20,7 @@ public class ShardIdentifier { private final String shardName; private final String memberName; private final String type; + private final String fullName; public ShardIdentifier(String shardName, String memberName, String type) { @@ -30,6 +31,9 @@ public class ShardIdentifier { this.shardName = shardName; this.memberName = memberName; this.type = type; + + fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-") + .append(type).toString(); } @Override @@ -64,14 +68,10 @@ public class ShardIdentifier { return result; } - @Override public String toString() { + @Override + public String toString() { //ensure the output of toString matches the pattern above - return new StringBuilder(memberName) - .append("-shard-") - .append(shardName) - .append("-") - .append(type) - .toString(); + return fullName; } public static Builder builder(){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java index 5b7002eda2..ce7d6303ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.SerializationUtils; @@ -24,6 +22,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; + @Deprecated public class CompositeModificationByteStringPayloadTest { @@ -69,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest { entries.add(new ReplicatedLogImplEntry(0, 1, payload)); - assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable()); + assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java index a55f6b865d..90b978821f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java @@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest { }); AppendEntries appendEntries = - new AppendEntries(1, "member-1", 0, 100, entries, 1); + new AppendEntries(1, "member-1", 0, 100, entries, 1, -1); AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable(RaftVersions.HELIUM_VERSION); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index 6f8035e2d1..58aec30a84 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -7,7 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore; -import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; @@ -16,6 +20,9 @@ import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; import akka.testkit.JavaTestKit; import akka.util.Timeout; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -36,16 +43,9 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Uninterruptibles; import scala.concurrent.ExecutionContextExecutor; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.eq; /** * Unit tests for DataChangeListenerRegistrationProxy. @@ -207,6 +207,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { doReturn(Futures.failed(new RuntimeException("mock"))). when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); + doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), AsyncDataBroker.DataChangeScope.ONE); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java index 79c1bb4720..28fc6b0f57 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java @@ -98,7 +98,7 @@ public class Client { } }); - return new AppendEntries(1, "member-1", 0, 100, modification, 1); + return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1); } public static AppendEntries keyValueAppendEntries() { @@ -123,6 +123,6 @@ public class Client { } }); - return new AppendEntries(1, "member-1", 0, 100, modification, 1); + return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1); } } diff --git a/opendaylight/md-sal/sal-test-model/pom.xml b/opendaylight/md-sal/sal-test-model/pom.xml index 2a8a80da09..852e99e146 100644 --- a/opendaylight/md-sal/sal-test-model/pom.xml +++ b/opendaylight/md-sal/sal-test-model/pom.xml @@ -9,6 +9,9 @@ 4.0.0 + sal-test-model + bundle + org.opendaylight.yangtools @@ -20,7 +23,6 @@ - sal-test-model diff --git a/opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-test-notification.yang b/opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-test-notification.yang new file mode 100644 index 0000000000..31ec7aed61 --- /dev/null +++ b/opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-test-notification.yang @@ -0,0 +1,25 @@ +module opendaylight-test-notification { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:test:bi:ba:notification"; + prefix "ntf"; + + description + "Test model for testing of registering notification listener and publishing of notification."; + + revision "2015-02-05" { + description + "Initial revision"; + } + + notification out-of-pixie-dust-notification { + description "Just a testing notification that we can not fly for now."; + + leaf reason { + type string; + } + + leaf days-till-new-dust { + type uint16; + } + } +} \ No newline at end of file diff --git a/opendaylight/netconf/ietf-netconf-notifications/pom.xml b/opendaylight/netconf/ietf-netconf-notifications/pom.xml new file mode 100644 index 0000000000..1ce3b031b7 --- /dev/null +++ b/opendaylight/netconf/ietf-netconf-notifications/pom.xml @@ -0,0 +1,63 @@ + + + + + 4.0.0 + + org.opendaylight.controller + netconf-subsystem + 0.3.0-SNAPSHOT + + ietf-netconf-notifications + bundle + ${project.artifactId} + + + + com.google.guava + guava + + + org.opendaylight.controller + ietf-netconf + + + org.opendaylight.yangtools.model + ietf-yang-types + + + org.slf4j + slf4j-api + + + + + + + + org.apache.felix + maven-bundle-plugin + + + + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.*, + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.*, + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.* + + + + + + org.opendaylight.yangtools + yang-maven-plugin + + + + + diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang new file mode 100644 index 0000000000..a04799dcf6 --- /dev/null +++ b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang @@ -0,0 +1,363 @@ +module ietf-netconf-notifications { + + namespace + "urn:ietf:params:xml:ns:yang:ietf-netconf-notifications"; + + prefix ncn; + + import ietf-inet-types { prefix inet; } + import ietf-netconf { prefix nc; } + + organization + "IETF NETCONF (Network Configuration Protocol) Working Group"; + + contact + "WG Web: + WG List: + + WG Chair: Bert Wijnen + + + WG Chair: Mehmet Ersue + + + Editor: Andy Bierman + "; + + description + "This module defines a YANG data model for use with the + NETCONF protocol that allows the NETCONF client to + receive common NETCONF base event notifications. + + Copyright (c) 2012 IETF Trust and the persons identified as + the document authors. All rights reserved. + + Redistribution and use in source and binary forms, with or + without modification, is permitted pursuant to, and subject + to the license terms contained in, the Simplified BSD License + + + + set forth in Section 4.c of the IETF Trust's Legal Provisions + Relating to IETF Documents + (http://trustee.ietf.org/license-info). + + This version of this YANG module is part of RFC 6470; see + the RFC itself for full legal notices."; + + revision "2012-02-06" { + description + "Initial version. Errata 3957 added."; + reference + "RFC 6470: NETCONF Base Notifications"; + } + + grouping common-session-parms { + description + "Common session parameters to identify a + management session."; + + leaf username { + type string; + mandatory true; + description + "Name of the user for the session."; + } + + leaf session-id { + type nc:session-id-or-zero-type; + mandatory true; + description + "Identifier of the session. + A NETCONF session MUST be identified by a non-zero value. + A non-NETCONF session MAY be identified by the value zero."; + } + + leaf source-host { + type inet:ip-address; + description + "Address of the remote host for the session."; + } + } + + + + + + + + + grouping changed-by-parms { + description + "Common parameters to identify the source + of a change event, such as a configuration + or capability change."; + + container changed-by { + description + "Indicates the source of the change. + If caused by internal action, then the + empty leaf 'server' will be present. + If caused by a management session, then + the name, remote host address, and session ID + of the session that made the change will be reported."; + choice server-or-user { + mandatory true; + leaf server { + type empty; + description + "If present, the change was caused + by the server."; + } + + case by-user { + uses common-session-parms; + } + } // choice server-or-user + } // container changed-by-parms + } + + + notification netconf-config-change { + description + "Generated when the NETCONF server detects that the + or configuration datastore + has been changed by a management session. + The notification summarizes the edits that + have been detected. + + The server MAY choose to also generate this + notification while loading a datastore during the + boot process for the device."; + + uses changed-by-parms; + + + + + + leaf datastore { + type enumeration { + enum running { + description "The datastore has changed."; + } + enum startup { + description "The datastore has changed"; + } + } + default "running"; + description + "Indicates which configuration datastore has changed."; + } + + list edit { + description + "An edit record SHOULD be present for each distinct + edit operation that the server has detected on + the target datastore. This list MAY be omitted + if the detailed edit operations are not known. + The server MAY report entries in this list for + changes not made by a NETCONF session (e.g., CLI)."; + + leaf target { + type instance-identifier; + description + "Topmost node associated with the configuration change. + A server SHOULD set this object to the node within + the datastore that is being altered. A server MAY + set this object to one of the ancestors of the actual + node that was changed, or omit this object, if the + exact node is not known."; + } + + leaf operation { + type nc:edit-operation-type; + description + "Type of edit operation performed. + A server MUST set this object to the NETCONF edit + operation performed on the target datastore."; + } + } // list edit + } // notification netconf-config-change + + + + + + + notification netconf-capability-change { + description + "Generated when the NETCONF server detects that + the server capabilities have changed. + Indicates which capabilities have been added, deleted, + and/or modified. The manner in which a server + capability is changed is outside the scope of this + document."; + + uses changed-by-parms; + + leaf-list added-capability { + type inet:uri; + description + "List of capabilities that have just been added."; + } + + leaf-list deleted-capability { + type inet:uri; + description + "List of capabilities that have just been deleted."; + } + + leaf-list modified-capability { + type inet:uri; + description + "List of capabilities that have just been modified. + A capability is considered to be modified if the + base URI for the capability has not changed, but + one or more of the parameters encoded at the end of + the capability URI have changed. + The new modified value of the complete URI is returned."; + } + } // notification netconf-capability-change + + + notification netconf-session-start { + description + "Generated when a NETCONF server detects that a + NETCONF session has started. A server MAY generate + this event for non-NETCONF management sessions. + Indicates the identity of the user that started + the session."; + uses common-session-parms; + } // notification netconf-session-start + + + + + notification netconf-session-end { + description + "Generated when a NETCONF server detects that a + NETCONF session has terminated. + A server MAY optionally generate this event for + non-NETCONF management sessions. Indicates the + identity of the user that owned the session, + and why the session was terminated."; + + uses common-session-parms; + + leaf killed-by { + when "../termination-reason = 'killed'"; + type nc:session-id-type; + description + "The ID of the session that directly caused this session + to be abnormally terminated. If this session was abnormally + terminated by a non-NETCONF session unknown to the server, + then this leaf will not be present."; + } + + leaf termination-reason { + type enumeration { + enum "closed" { + description + "The session was terminated by the client in normal + fashion, e.g., by the NETCONF + protocol operation."; + } + enum "killed" { + description + "The session was terminated in abnormal + fashion, e.g., by the NETCONF + protocol operation."; + } + enum "dropped" { + description + "The session was terminated because the transport layer + connection was unexpectedly closed."; + } + enum "timeout" { + description + "The session was terminated because of inactivity, + e.g., waiting for the message or + messages."; + } + + + + enum "bad-hello" { + description + "The client's message was invalid."; + } + enum "other" { + description + "The session was terminated for some other reason."; + } + } + mandatory true; + description + "Reason the session was terminated."; + } + } // notification netconf-session-end + + + notification netconf-confirmed-commit { + description + "Generated when a NETCONF server detects that a + confirmed-commit event has occurred. Indicates the event + and the current state of the confirmed-commit procedure + in progress."; + reference + "RFC 6241, Section 8.4"; + + uses common-session-parms { + when "confirm-event != 'timeout'"; + } + + leaf confirm-event { + type enumeration { + enum "start" { + description + "The confirmed-commit procedure has started."; + } + enum "cancel" { + description + "The confirmed-commit procedure has been canceled, + e.g., due to the session being terminated, or an + explicit operation."; + } + enum "timeout" { + description + "The confirmed-commit procedure has been canceled + due to the confirm-timeout interval expiring. + The common session parameters will not be present + in this sub-mode."; + } + + enum "extend" { + description + "The confirmed-commit timeout has been extended, + e.g., by a new operation."; + } + enum "complete" { + description + "The confirmed-commit procedure has been completed."; + } + } + mandatory true; + description + "Indicates the event that caused the notification."; + } + + leaf timeout { + when + "../confirm-event = 'start' or ../confirm-event = 'extend'"; + type uint32; + units "seconds"; + description + "The configured timeout value if the event type + is 'start' or 'extend'. This value represents + the approximate number of seconds from the event + time when the 'timeout' event might occur."; + } + } // notification netconf-confirmed-commit + +} diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang new file mode 100644 index 0000000000..fb9aac133b --- /dev/null +++ b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang @@ -0,0 +1,95 @@ +module nc-notifications { + + namespace "urn:ietf:params:xml:ns:netmod:notification"; + prefix "manageEvent"; + + import ietf-yang-types{ prefix yang; } + import notifications { prefix ncEvent; } + + organization + "IETF NETCONF WG"; + + contact + "netconf@ietf.org"; + + description + "Conversion of the 'manageEvent' XSD in the NETCONF + Notifications RFC."; + + reference + "RFC 5277"; + + revision 2008-07-14 { + description "RFC 5277 version."; + } + + container netconf { + description "Top-level element in the notification namespace"; + + config false; + + container streams { + description + "The list of event streams supported by the system. When + a query is issued, the returned set of streams is + determined based on user privileges."; + + list stream { + description + "Stream name, description and other information."; + key name; + min-elements 1; + + leaf name { + description + "The name of the event stream. If this is the default + NETCONF stream, this must have the value 'NETCONF'."; + type ncEvent:streamNameType; + } + + leaf description { + description + "A description of the event stream, including such + information as the type of events that are sent over + this stream."; + type string; + mandatory true; + } + + leaf replaySupport { + description + "A description of the event stream, including such + information as the type of events that are sent over + this stream."; + type boolean; + mandatory true; + } + + leaf replayLogCreationTime { + description + "The timestamp of the creation of the log used to support + the replay function on this stream. Note that this might + be earlier then the earliest available notification in + the log. This object is updated if the log resets for + some reason. This object MUST be present if replay is + supported."; + type yang:date-and-time; // xsd:dateTime is wrong! + } + } + } + } + + notification replayComplete { + description + "This notification is sent to signal the end of a replay + portion of a subscription."; + } + + notification notificationComplete { + description + "This notification is sent to signal the end of a notification + subscription. It is sent in the case that stopTime was + specified during the creation of the subscription.."; + } + +} diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang new file mode 100644 index 0000000000..f107c2a8aa --- /dev/null +++ b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang @@ -0,0 +1,83 @@ +module notifications { + + namespace "urn:ietf:params:xml:ns:netconf:notification:1.0"; + prefix "ncEvent"; + + import ietf-yang-types { prefix yang; } + + organization + "IETF NETCONF WG"; + + contact + "netconf@ops.ietf.org"; + + description + "Conversion of the 'ncEvent' XSD in the + NETCONF Notifications RFC."; + + reference + "RFC 5277."; + + revision 2008-07-14 { + description "RFC 5277 version."; + } + + typedef streamNameType { + description + "The name of an event stream."; + type string; + } + + rpc create-subscription { + description + "The command to create a notification subscription. It + takes as argument the name of the notification stream + and filter. Both of those options limit the content of + the subscription. In addition, there are two time-related + parameters, startTime and stopTime, which can be used to + select the time interval of interest to the notification + replay feature."; + + input { + leaf stream { + description + "An optional parameter that indicates which stream of events + is of interest. If not present, then events in the default + NETCONF stream will be sent."; + type streamNameType; + default "NETCONF"; + } + + anyxml filter { + description + "An optional parameter that indicates which subset of all + possible events is of interest. The format of this + parameter is the same as that of the filter parameter + in the NETCONF protocol operations. If not present, + all events not precluded by other parameters will + be sent."; + } + + leaf startTime { + description + "A parameter used to trigger the replay feature and + indicates that the replay should start at the time + specified. If start time is not present, this is not a + replay subscription."; + type yang:date-and-time; + } + + leaf stopTime { + // must ". >= ../startTime"; + description + "An optional parameter used with the optional replay + feature to indicate the newest notifications of + interest. If stop time is not present, the notifications + will continue until the subscription is terminated. + Must be used with startTime."; + type yang:date-and-time; + } + } + } +} + diff --git a/opendaylight/netconf/ietf-netconf/pom.xml b/opendaylight/netconf/ietf-netconf/pom.xml new file mode 100644 index 0000000000..6ed7a5f130 --- /dev/null +++ b/opendaylight/netconf/ietf-netconf/pom.xml @@ -0,0 +1,56 @@ + + + + + 4.0.0 + + org.opendaylight.controller + netconf-subsystem + 0.3.0-SNAPSHOT + + ietf-netconf + bundle + ${project.artifactId} + + + + + com.google.guava + guava + + + org.opendaylight.yangtools.model + ietf-inet-types + + + org.slf4j + slf4j-api + + + + + + + + org.apache.felix + maven-bundle-plugin + + + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.* + + + + + org.opendaylight.yangtools + yang-maven-plugin + + + + + diff --git a/opendaylight/netconf/ietf-netconf/src/main/yang/ietf-netconf@2011-06-01.yang b/opendaylight/netconf/ietf-netconf/src/main/yang/ietf-netconf@2011-06-01.yang new file mode 100644 index 0000000000..4bbb1c2792 --- /dev/null +++ b/opendaylight/netconf/ietf-netconf/src/main/yang/ietf-netconf@2011-06-01.yang @@ -0,0 +1,928 @@ +module ietf-netconf { + + // the namespace for NETCONF XML definitions is unchanged + // from RFC 4741, which this document replaces + namespace "urn:ietf:params:xml:ns:netconf:base:1.0"; + + prefix nc; + + import ietf-inet-types { + prefix inet; + } + + organization + "IETF NETCONF (Network Configuration) Working Group"; + + contact + "WG Web: + WG List: + + WG Chair: Bert Wijnen + + + WG Chair: Mehmet Ersue + + + Editor: Martin Bjorklund + + + Editor: Juergen Schoenwaelder + + + Editor: Andy Bierman + "; + description + "NETCONF Protocol Data Types and Protocol Operations. + + Copyright (c) 2011 IETF Trust and the persons identified as + the document authors. All rights reserved. + + Redistribution and use in source and binary forms, with or + without modification, is permitted pursuant to, and subject + to the license terms contained in, the Simplified BSD License + set forth in Section 4.c of the IETF Trust's Legal Provisions + Relating to IETF Documents + (http://trustee.ietf.org/license-info). + + This version of this YANG module is part of RFC 6241; see + the RFC itself for full legal notices."; + + revision 2011-06-01 { + description + "Initial revision;"; + reference + "RFC 6241: Network Configuration Protocol"; + } + + extension get-filter-element-attributes { + description + "If this extension is present within an 'anyxml' + statement named 'filter', which must be conceptually + defined within the RPC input section for the + and protocol operations, then the + following unqualified XML attribute is supported + within the element, within a or + protocol operation: + + type : optional attribute with allowed + value strings 'subtree' and 'xpath'. + If missing, the default value is 'subtree'. + + If the 'xpath' feature is supported, then the + following unqualified XML attribute is + also supported: + + select: optional attribute containing a + string representing an XPath expression. + The 'type' attribute must be equal to 'xpath' + if this attribute is present."; + } + + // NETCONF capabilities defined as features + feature writable-running { + description + "NETCONF :writable-running capability; + If the server advertises the :writable-running + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.2"; + } + + feature candidate { + description + "NETCONF :candidate capability; + If the server advertises the :candidate + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.3"; + } + + feature confirmed-commit { + if-feature candidate; + description + "NETCONF :confirmed-commit:1.1 capability; + If the server advertises the :confirmed-commit:1.1 + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + + reference "RFC 6241, Section 8.4"; + } + + feature rollback-on-error { + description + "NETCONF :rollback-on-error capability; + If the server advertises the :rollback-on-error + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.5"; + } + + feature validate { + description + "NETCONF :validate:1.1 capability; + If the server advertises the :validate:1.1 + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.6"; + } + + feature startup { + description + "NETCONF :startup capability; + If the server advertises the :startup + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.7"; + } + + feature url { + description + "NETCONF :url capability; + If the server advertises the :url + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.8"; + } + + feature xpath { + description + "NETCONF :xpath capability; + If the server advertises the :xpath + capability for a session, then this feature must + also be enabled for that session. Otherwise, + this feature must not be enabled."; + reference "RFC 6241, Section 8.9"; + } + + // NETCONF Simple Types + + typedef session-id-type { + type uint32 { + range "1..max"; + } + description + "NETCONF Session Id"; + } + + typedef session-id-or-zero-type { + type uint32; + description + "NETCONF Session Id or Zero to indicate none"; + } + typedef error-tag-type { + type enumeration { + enum in-use { + description + "The request requires a resource that + already is in use."; + } + enum invalid-value { + description + "The request specifies an unacceptable value for one + or more parameters."; + } + enum too-big { + description + "The request or response (that would be generated) is + too large for the implementation to handle."; + } + enum missing-attribute { + description + "An expected attribute is missing."; + } + enum bad-attribute { + description + "An attribute value is not correct; e.g., wrong type, + out of range, pattern mismatch."; + } + enum unknown-attribute { + description + "An unexpected attribute is present."; + } + enum missing-element { + description + "An expected element is missing."; + } + enum bad-element { + description + "An element value is not correct; e.g., wrong type, + out of range, pattern mismatch."; + } + enum unknown-element { + description + "An unexpected element is present."; + } + enum unknown-namespace { + description + "An unexpected namespace is present."; + } + enum access-denied { + description + "Access to the requested protocol operation or + data model is denied because authorization failed."; + } + enum lock-denied { + description + "Access to the requested lock is denied because the + lock is currently held by another entity."; + } + enum resource-denied { + description + "Request could not be completed because of + insufficient resources."; + } + enum rollback-failed { + description + "Request to roll back some configuration change (via + rollback-on-error or operations) + was not completed for some reason."; + + } + enum data-exists { + description + "Request could not be completed because the relevant + data model content already exists. For example, + a 'create' operation was attempted on data that + already exists."; + } + enum data-missing { + description + "Request could not be completed because the relevant + data model content does not exist. For example, + a 'delete' operation was attempted on + data that does not exist."; + } + enum operation-not-supported { + description + "Request could not be completed because the requested + operation is not supported by this implementation."; + } + enum operation-failed { + description + "Request could not be completed because the requested + operation failed for some reason not covered by + any other error condition."; + } + enum partial-operation { + description + "This error-tag is obsolete, and SHOULD NOT be sent + by servers conforming to this document."; + } + enum malformed-message { + description + "A message could not be handled because it failed to + be parsed correctly. For example, the message is not + well-formed XML or it uses an invalid character set."; + } + } + description "NETCONF Error Tag"; + reference "RFC 6241, Appendix A"; + } + + typedef error-severity-type { + type enumeration { + enum error { + description "Error severity"; + } + enum warning { + description "Warning severity"; + } + } + description "NETCONF Error Severity"; + reference "RFC 6241, Section 4.3"; + } + + typedef edit-operation-type { + type enumeration { + enum merge { + description + "The configuration data identified by the + element containing this attribute is merged + with the configuration at the corresponding + level in the configuration datastore identified + by the target parameter."; + } + enum replace { + description + "The configuration data identified by the element + containing this attribute replaces any related + configuration in the configuration datastore + identified by the target parameter. If no such + configuration data exists in the configuration + datastore, it is created. Unlike a + operation, which replaces the + entire target configuration, only the configuration + actually present in the config parameter is affected."; + } + enum create { + description + "The configuration data identified by the element + containing this attribute is added to the + configuration if and only if the configuration + data does not already exist in the configuration + datastore. If the configuration data exists, an + element is returned with an + value of 'data-exists'."; + } + enum delete { + description + "The configuration data identified by the element + containing this attribute is deleted from the + configuration if and only if the configuration + data currently exists in the configuration + datastore. If the configuration data does not + exist, an element is returned with + an value of 'data-missing'."; + } + enum remove { + description + "The configuration data identified by the element + containing this attribute is deleted from the + configuration if the configuration + data currently exists in the configuration + datastore. If the configuration data does not + exist, the 'remove' operation is silently ignored + by the server."; + } + } + default "merge"; + description "NETCONF 'operation' attribute values"; + reference "RFC 6241, Section 7.2"; + } + + // NETCONF Standard Protocol Operations + + rpc get-config { + description + "Retrieve all or part of a specified configuration."; + + reference "RFC 6241, Section 7.1"; + + input { + container source { + description + "Particular configuration to retrieve."; + + choice config-source { + mandatory true; + description + "The configuration to retrieve."; + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config source."; + } + leaf running { + type empty; + description + "The running configuration is the config source."; + } + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config source. + This is optional-to-implement on the server because + not all servers will support filtering for this + datastore."; + } + } + } + + anyxml filter { + description + "Subtree or XPath filter to use."; + nc:get-filter-element-attributes; + } + } + + output { + anyxml data { + description + "Copy of the source datastore subset that matched + the filter criteria (if any). An empty data container + indicates that the request did not produce any results."; + } + } + } + + rpc edit-config { + description + "The operation loads all or part of a specified + configuration to the specified target configuration."; + + reference "RFC 6241, Section 7.2"; + + input { + container target { + description + "Particular configuration to edit."; + + choice config-target { + mandatory true; + description + "The configuration target."; + + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config target."; + } + leaf running { + if-feature writable-running; + type empty; + description + "The running configuration is the config source."; + } + } + } + + leaf default-operation { + type enumeration { + enum merge { + description + "The default operation is merge."; + } + enum replace { + description + "The default operation is replace."; + } + enum none { + description + "There is no default operation."; + } + } + default "merge"; + description + "The default operation to use."; + } + + leaf test-option { + if-feature validate; + type enumeration { + enum test-then-set { + description + "The server will test and then set if no errors."; + } + enum set { + description + "The server will set without a test first."; + } + + enum test-only { + description + "The server will only test and not set, even + if there are no errors."; + } + } + default "test-then-set"; + description + "The test option to use."; + } + + leaf error-option { + type enumeration { + enum stop-on-error { + description + "The server will stop on errors."; + } + enum continue-on-error { + description + "The server may continue on errors."; + } + enum rollback-on-error { + description + "The server will roll back on errors. + This value can only be used if the 'rollback-on-error' + feature is supported."; + } + } + default "stop-on-error"; + description + "The error option to use."; + } + + choice edit-content { + mandatory true; + description + "The content for the edit operation."; + + anyxml config { + description + "Inline Config content."; + } + leaf url { + if-feature url; + type inet:uri; + description + "URL-based config content."; + } + } + } + } + + rpc copy-config { + description + "Create or replace an entire configuration datastore with the + contents of another complete configuration datastore."; + + reference "RFC 6241, Section 7.3"; + + input { + container target { + description + "Particular configuration to copy to."; + + choice config-target { + mandatory true; + description + "The configuration target of the copy operation."; + + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config target."; + } + leaf running { + if-feature writable-running; + type empty; + description + "The running configuration is the config target. + This is optional-to-implement on the server."; + } + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config target."; + } + leaf url { + if-feature url; + type inet:uri; + description + "The URL-based configuration is the config target."; + } + } + } + + container source { + description + "Particular configuration to copy from."; + + choice config-source { + mandatory true; + description + "The configuration source for the copy operation."; + + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config source."; + } + leaf running { + type empty; + description + "The running configuration is the config source."; + } + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config source."; + } + leaf url { + if-feature url; + type inet:uri; + description + "The URL-based configuration is the config source."; + } + anyxml config { + description + "Inline Config content: element. Represents + an entire configuration datastore, not + a subset of the running datastore."; + } + } + } + } + } + + rpc delete-config { + description + "Delete a configuration datastore."; + + reference "RFC 6241, Section 7.4"; + + input { + container target { + description + "Particular configuration to delete."; + + choice config-target { + mandatory true; + description + "The configuration target to delete."; + + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config target."; + } + leaf url { + if-feature url; + type inet:uri; + description + "The URL-based configuration is the config target."; + } + } + } + } + } + + rpc lock { + description + "The lock operation allows the client to lock the configuration + system of a device."; + + reference "RFC 6241, Section 7.5"; + + input { + container target { + description + "Particular configuration to lock."; + + choice config-target { + mandatory true; + description + "The configuration target to lock."; + + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config target."; + } + leaf running { + type empty; + description + "The running configuration is the config target."; + } + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config target."; + } + } + } + } + } + + rpc unlock { + description + "The unlock operation is used to release a configuration lock, + previously obtained with the 'lock' operation."; + + reference "RFC 6241, Section 7.6"; + + input { + container target { + description + "Particular configuration to unlock."; + + choice config-target { + mandatory true; + description + "The configuration target to unlock."; + + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config target."; + } + leaf running { + type empty; + description + "The running configuration is the config target."; + } + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config target."; + } + } + } + } + } + + rpc get { + description + "Retrieve running configuration and device state information."; + + reference "RFC 6241, Section 7.7"; + + input { + anyxml filter { + description + "This parameter specifies the portion of the system + configuration and state data to retrieve."; + nc:get-filter-element-attributes; + } + } + + output { + anyxml data { + description + "Copy of the running datastore subset and/or state + data that matched the filter criteria (if any). + An empty data container indicates that the request did not + produce any results."; + } + } + } + + rpc close-session { + description + "Request graceful termination of a NETCONF session."; + + reference "RFC 6241, Section 7.8"; + } + + rpc kill-session { + description + "Force the termination of a NETCONF session."; + + reference "RFC 6241, Section 7.9"; + + input { + leaf session-id { + type session-id-type; + mandatory true; + description + "Particular session to kill."; + } + } + } + + rpc commit { + if-feature candidate; + + description + "Commit the candidate configuration as the device's new + current configuration."; + + reference "RFC 6241, Section 8.3.4.1"; + + input { + leaf confirmed { + if-feature confirmed-commit; + type empty; + description + "Requests a confirmed commit."; + reference "RFC 6241, Section 8.3.4.1"; + } + + leaf confirm-timeout { + if-feature confirmed-commit; + type uint32 { + range "1..max"; + } + units "seconds"; + default "600"; // 10 minutes + description + "The timeout interval for a confirmed commit."; + reference "RFC 6241, Section 8.3.4.1"; + } + + leaf persist { + if-feature confirmed-commit; + type string; + description + "This parameter is used to make a confirmed commit + persistent. A persistent confirmed commit is not aborted + if the NETCONF session terminates. The only way to abort + a persistent confirmed commit is to let the timer expire, + or to use the operation. + + The value of this parameter is a token that must be given + in the 'persist-id' parameter of or + operations in order to confirm or cancel + the persistent confirmed commit. + + The token should be a random string."; + reference "RFC 6241, Section 8.3.4.1"; + } + + leaf persist-id { + if-feature confirmed-commit; + type string; + description + "This parameter is given in order to commit a persistent + confirmed commit. The value must be equal to the value + given in the 'persist' parameter to the operation. + If it does not match, the operation fails with an + 'invalid-value' error."; + reference "RFC 6241, Section 8.3.4.1"; + } + + } + } + + rpc discard-changes { + if-feature candidate; + + description + "Revert the candidate configuration to the current + running configuration."; + reference "RFC 6241, Section 8.3.4.2"; + } + + rpc cancel-commit { + if-feature confirmed-commit; + description + "This operation is used to cancel an ongoing confirmed commit. + If the confirmed commit is persistent, the parameter + 'persist-id' must be given, and it must match the value of the + 'persist' parameter."; + reference "RFC 6241, Section 8.4.4.1"; + + input { + leaf persist-id { + type string; + description + "This parameter is given in order to cancel a persistent + confirmed commit. The value must be equal to the value + given in the 'persist' parameter to the operation. + If it does not match, the operation fails with an + 'invalid-value' error."; + } + } + } + + rpc validate { + if-feature validate; + + description + "Validates the contents of the specified configuration."; + + reference "RFC 6241, Section 8.6.4.1"; + + input { + container source { + description + "Particular configuration to validate."; + + choice config-source { + mandatory true; + description + "The configuration source to validate."; + + leaf candidate { + if-feature candidate; + type empty; + description + "The candidate configuration is the config source."; + } + leaf running { + type empty; + description + "The running configuration is the config source."; + } + leaf startup { + if-feature startup; + type empty; + description + "The startup configuration is the config source."; + } + leaf url { + if-feature url; + type inet:uri; + description + "The URL-based configuration is the config source."; + } + anyxml config { + description + "Inline Config content: element. Represents + an entire configuration datastore, not + a subset of the running datastore."; + } + } + } + } + } + +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java index 287ff2dca7..83e1f9129b 100644 --- a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java @@ -444,7 +444,9 @@ public class NetconfDeviceSimulator implements Closeable { final SimulatedEditConfig sEditConfig = new SimulatedEditConfig(String.valueOf(currentSessionId), storage); final SimulatedGetConfig sGetConfig = new SimulatedGetConfig(String.valueOf(currentSessionId), storage); final SimulatedCommit sCommit = new SimulatedCommit(String.valueOf(currentSessionId)); - return Sets.newHashSet(sGet, sGetConfig, sEditConfig, sCommit); + final SimulatedLock sLock = new SimulatedLock(String.valueOf(currentSessionId)); + final SimulatedUnLock sUnlock = new SimulatedUnLock(String.valueOf(currentSessionId)); + return Sets.newHashSet(sGet, sGetConfig, sEditConfig, sCommit, sLock, sUnlock); } @Override diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedLock.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedLock.java new file mode 100644 index 0000000000..4717e5464f --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedLock.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import com.google.common.base.Optional; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; +import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +class SimulatedLock extends AbstractConfigNetconfOperation { + + SimulatedLock(final String netconfSessionIdForReporting) { + super(null, netconfSessionIdForReporting); + } + + @Override + protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException { + return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.absent()); + } + + @Override + protected String getOperationName() { + return "lock"; + } +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedUnLock.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedUnLock.java new file mode 100644 index 0000000000..31f9fc13ab --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedUnLock.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import com.google.common.base.Optional; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; +import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +class SimulatedUnLock extends AbstractConfigNetconfOperation { + + SimulatedUnLock(final String netconfSessionIdForReporting) { + super(null, netconfSessionIdForReporting); + } + + @Override + protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException { + return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.absent()); + } + + @Override + protected String getOperationName() { + return "unlock"; + } +} diff --git a/opendaylight/netconf/pom.xml b/opendaylight/netconf/pom.xml index e1aa6ce3ed..2a5ba09673 100644 --- a/opendaylight/netconf/pom.xml +++ b/opendaylight/netconf/pom.xml @@ -13,9 +13,6 @@ 0.3.0-SNAPSHOT pom ${project.artifactId} - - 3.0.4 - netconf-api diff --git a/pom.xml b/pom.xml index 1217d72066..f588f3f17c 100644 --- a/pom.xml +++ b/pom.xml @@ -13,10 +13,6 @@ pom controller - - 3.0 - -